Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow user to specify number of workers #2829

Merged
merged 3 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/core/rpc/BaseRpcDriver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class MockRpcDriver extends BaseRpcDriver {

workerCheckFrequency = 500

async makeWorker(_pluginManager: PluginManager) {
async makeWorker() {
return new MockWorkerHandle()
}
}
Expand Down
87 changes: 35 additions & 52 deletions packages/core/rpc/BaseRpcDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { objectFromEntries } from '../util'
import { serializeAbortSignal } from './remoteAbortSignals'
import PluginManager from '../PluginManager'
import { AnyConfigurationModel } from '../configuration/configurationSchema'
import { readConfObject } from '../configuration'

export interface WorkerHandle {
status?: string
Expand Down Expand Up @@ -70,30 +71,31 @@ function detectHardwareConcurrency() {
return 1
}
class LazyWorker {
worker?: WorkerHandle
workerP?: Promise<WorkerHandle> | undefined

constructor(public driver: BaseRpcDriver) {}

async getWorker(pluginManager: PluginManager, rpcDriverClassName: string) {
if (!this.worker) {
const worker = await this.driver.makeWorker(pluginManager)
watchWorker(worker, this.driver.maxPingTime, rpcDriverClassName).catch(
error => {
if (this.worker) {
console.error(
'worker did not respond, killing and generating new one',
)
console.error(error)
this.worker.destroy()
this.worker.status = 'killed'
this.worker.error = error
this.worker = undefined
}
},
)
this.worker = worker
async getWorker() {
if (!this.workerP) {
this.workerP = this.driver.makeWorker().then(worker => {
watchWorker(worker, this.driver.maxPingTime, this.driver.name).catch(
error => {
if (worker) {
console.error(
'worker did not respond, killing and generating new one',
)
console.error(error)
worker.destroy()
worker.status = 'killed'
worker.error = error
this.workerP = undefined
}
},
)
return worker
})
}
return this.worker
return this.workerP
}
}

Expand All @@ -104,9 +106,7 @@ export default abstract class BaseRpcDriver {

private workerAssignments = new Map<string, number>() // sessionId -> worker number

private workerCount = 0

abstract makeWorker(pluginManager: PluginManager): Promise<WorkerHandle>
abstract makeWorker(): Promise<WorkerHandle>

private workerPool?: LazyWorker[]

Expand All @@ -121,24 +121,18 @@ export default abstract class BaseRpcDriver {
}

// filter the given object and just remove any non-clonable things from it
filterArgs<THING_TYPE>(
thing: THING_TYPE,
pluginManager: PluginManager,
sessionId: string,
): THING_TYPE {
filterArgs<THING_TYPE>(thing: THING_TYPE, sessionId: string): THING_TYPE {
if (Array.isArray(thing)) {
return thing
.filter(isClonable)
.map(t =>
this.filterArgs(t, pluginManager, sessionId),
) as unknown as THING_TYPE
.map(t => this.filterArgs(t, sessionId)) as unknown as THING_TYPE
}
if (typeof thing === 'object' && thing !== null) {
// AbortSignals are specially handled
if (thing instanceof AbortSignal) {
return serializeAbortSignal(
thing,
this.remoteAbort.bind(this, pluginManager, sessionId),
this.remoteAbort.bind(this, sessionId),
) as unknown as THING_TYPE
}

Expand All @@ -155,19 +149,14 @@ export default abstract class BaseRpcDriver {
return objectFromEntries(
Object.entries(thing)
.filter(e => isClonable(e[1]))
.map(([k, v]) => [k, this.filterArgs(v, pluginManager, sessionId)]),
.map(([k, v]) => [k, this.filterArgs(v, sessionId)]),
) as THING_TYPE
}
return thing
}

async remoteAbort(
pluginManager: PluginManager,
sessionId: string,
functionName: string,
signalId: number,
) {
const worker = await this.getWorker(sessionId, pluginManager)
async remoteAbort(sessionId: string, functionName: string, signalId: number) {
const worker = await this.getWorker(sessionId)
worker.call(
functionName,
{ signalId },
Expand All @@ -179,7 +168,8 @@ export default abstract class BaseRpcDriver {
const hardwareConcurrency = detectHardwareConcurrency()

const workerCount =
this.workerCount || Math.max(1, Math.ceil((hardwareConcurrency - 2) / 3))
readConfObject(this.config, 'workerCount') ||
Math.max(1, Math.ceil((hardwareConcurrency - 2) / 3))

return [...new Array(workerCount)].map(() => new LazyWorker(this))
}
Expand All @@ -193,10 +183,7 @@ export default abstract class BaseRpcDriver {
return this.workerPool
}

async getWorker(
sessionId: string,
pluginManager: PluginManager,
): Promise<WorkerHandle> {
async getWorker(sessionId: string): Promise<WorkerHandle> {
const workers = this.getWorkerPool()
let workerNumber = this.workerAssignments.get(sessionId)
if (workerNumber === undefined) {
Expand All @@ -207,7 +194,7 @@ export default abstract class BaseRpcDriver {
}

// console.log(`${sessionId} -> worker ${workerNumber}`)
const worker = workers[workerNumber].getWorker(pluginManager, this.name)
const worker = workers[workerNumber].getWorker()
if (!worker) {
throw new Error('no web workers registered for RPC')
}
Expand All @@ -225,14 +212,10 @@ export default abstract class BaseRpcDriver {
throw new TypeError('sessionId is required')
}
let done = false
const worker = await this.getWorker(sessionId, pluginManager)
const worker = await this.getWorker(sessionId)
const rpcMethod = pluginManager.getRpcMethodType(functionName)
const serializedArgs = await rpcMethod.serializeArguments(args, this.name)
const filteredAndSerializedArgs = this.filterArgs(
serializedArgs,
pluginManager,
sessionId,
)
const filteredAndSerializedArgs = this.filterArgs(serializedArgs, sessionId)

// now actually call the worker
const callP = worker
Expand Down
36 changes: 23 additions & 13 deletions packages/core/rpc/RpcManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,32 @@ export default class RpcManager {
if (driver) {
return driver
}
let newDriver
const config = this.mainConfiguration.drivers.get('WebWorkerRpcDriver')
if (backendName === 'MainThreadRpcDriver') {
const backendConfiguration =
this.backendConfigurations.MainThreadRpcDriver

const backendConfiguration = this.backendConfigurations[backendName]
const DriverClassImpl = DriverClasses[backendName]

if (!DriverClassImpl) {
if (!backendConfiguration) {
throw new Error(
`requested RPC driver "${backendName}" is missing config`,
)
}
newDriver = new MainThreadRpcDriver({ ...backendConfiguration, config })
} else if (backendName === 'WebWorkerRpcDriver') {
const backendConfiguration = this.backendConfigurations.WebWorkerRpcDriver
if (!backendConfiguration) {
throw new Error(
`requested RPC driver "${backendName}" is missing config`,
)
}
newDriver = new WebWorkerRpcDriver(
{ ...backendConfiguration, config },
{ plugins: this.pluginManager.runtimePluginDefinitions },
)
} else {
throw new Error(`requested RPC driver "${backendName}" is not installed`)
}

if (!backendConfiguration) {
throw new Error(`requested RPC driver "${backendName}" is missing config`)
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const newDriver = new DriverClassImpl(backendConfiguration as any, {
plugins: this.pluginManager.runtimePluginDefinitions,
})
this.driverObjects.set(backendName, newDriver)
return newDriver
}
Expand Down
22 changes: 14 additions & 8 deletions packages/core/rpc/configSchema.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
import { types } from 'mobx-state-tree'
import { ConfigurationSchema } from '../configuration'

const BaseRpcDriverConfigSchema = ConfigurationSchema(
'MainThreadRpcDriver',
garrettjstevens marked this conversation as resolved.
Show resolved Hide resolved
{
workerCount: {
type: 'number',
description:
'The number of workers to use. If 0 (the default) JBrowse will decide how many workers to use.',
defaultValue: 0,
},
},
{ explicitlyTyped: true },
)
const MainThreadRpcDriverConfigSchema = ConfigurationSchema(
'MainThreadRpcDriver',
{},
{ explicitlyTyped: true },
{ explicitlyTyped: true, baseConfiguration: BaseRpcDriverConfigSchema },
)
const WebWorkerRpcDriverConfigSchema = ConfigurationSchema(
'WebWorkerRpcDriver',
{},
{ explicitlyTyped: true },
)
const ElectronRpcDriverConfigSchema = ConfigurationSchema(
'ElectronRpcDriver',
{},
{ explicitlyTyped: true },
{ explicitlyTyped: true, baseConfiguration: BaseRpcDriverConfigSchema },
)

export default ConfigurationSchema(
Expand All @@ -31,7 +38,6 @@ export default ConfigurationSchema(
types.union(
MainThreadRpcDriverConfigSchema,
WebWorkerRpcDriverConfigSchema,
ElectronRpcDriverConfigSchema,
),
),
{ MainThreadRpcDriver: { type: 'MainThreadRpcDriver' } },
Expand Down
14 changes: 10 additions & 4 deletions products/jbrowse-web/src/Loader.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,16 @@ const Renderer = observer(
},
{ pluginManager },
)
rootModel.jbrowse.configuration.rpc.addDriverConfig(
'WebWorkerRpcDriver',
{ type: 'WebWorkerRpcDriver' },
)
if (
!rootModel.jbrowse.configuration.rpc.drivers.get(
'WebWorkerRpcDriver',
)
) {
rootModel.jbrowse.configuration.rpc.addDriverConfig(
'WebWorkerRpcDriver',
{ type: 'WebWorkerRpcDriver' },
)
}
if (!loader.configSnapshot?.configuration?.rpc?.defaultDriver) {
rootModel.jbrowse.configuration.rpc.defaultDriver.set(
'WebWorkerRpcDriver',
Expand Down