Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/dynamic pools #237

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/master/pool-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export type PoolEvent<ThreadType extends Thread> = {
export interface WorkerDescriptor<ThreadType extends Thread> {
init: Promise<ThreadType>
runningTasks: Array<Promise<any>>
lastActivity: number
}

/**
Expand Down
74 changes: 67 additions & 7 deletions src/master/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,13 @@ function spawnWorkers<ThreadType extends Thread>(
spawnWorker: () => Promise<ThreadType>,
count: number
): Array<WorkerDescriptor<ThreadType>> {
return createArray(count).map((): WorkerDescriptor<ThreadType> => ({
init: spawnWorker(),
runningTasks: []
}))
return createArray(count).map(
(): WorkerDescriptor<ThreadType> => ({
init: spawnWorker(),
runningTasks: [],
lastActivity: Date.now()
})
)
}

/**
Expand Down Expand Up @@ -109,6 +112,15 @@ export interface PoolOptions {

/** No. of worker threads to spawn and to be managed by the pool. */
size?: number

/** For dynamically sized pool, the minimum number of active workers. */
minSize?: number

/** For dynamically sized pool, maximum idle lifetime of worker threads. */
idleTimeoutMillis?: number

/** For dynamically sized pool, frequency of cleaning up idle threads. */
idleCleanupIntervalMillis?: number
}

class WorkerPool<ThreadType extends Thread> implements Pool<ThreadType> {
Expand All @@ -124,6 +136,11 @@ class WorkerPool<ThreadType extends Thread> implements Pool<ThreadType> {
private isClosing = false
private nextTaskID = 1
private taskQueue: Array<QueuedTask<ThreadType, any>> = []
private maxSize: number
private minSize: number
private idleTimeoutMillis?: number
private spawnWorker: () => Promise<ThreadType>
private idleCleanupHandle: NodeJS.Timeout | undefined

constructor(
spawnWorker: () => Promise<ThreadType>,
Expand All @@ -134,13 +151,26 @@ class WorkerPool<ThreadType extends Thread> implements Pool<ThreadType> {
: optionsOrSize || {}

const { size = defaultPoolSize } = options
this.maxSize = size
this.minSize = options.minSize ?? 0
this.idleTimeoutMillis = options.idleTimeoutMillis
this.spawnWorker = spawnWorker

this.debug = DebugLogger(`threads:pool:${slugify(options.name || String(nextPoolID++))}`)
this.options = options
this.workers = spawnWorkers(spawnWorker, size)

const initialWorkers = this.idleTimeoutMillis ? this.minSize : size
this.workers = spawnWorkers(spawnWorker, initialWorkers)

this.eventObservable = multicast(Observable.from(this.eventSubject))

if (this.idleTimeoutMillis) {
this.idleCleanupHandle = setInterval(
() => this.purgeExpiredWorkers(),
options.idleCleanupIntervalMillis ?? 30 * 1000
)
}

Promise.all(this.workers.map(worker => worker.init)).then(
() => this.eventSubject.next({
type: PoolEventType.initialized,
Expand Down Expand Up @@ -191,6 +221,8 @@ class WorkerPool<ThreadType extends Thread> implements Pool<ThreadType> {
}

private async run(worker: WorkerDescriptor<ThreadType>, task: QueuedTask<ThreadType, any>) {
worker.lastActivity = Date.now()

const runPromise = (async () => {
const removeTaskFromWorkersRunningTasks = () => {
worker.runningTasks = worker.runningTasks.filter(someRunPromise => someRunPromise !== runPromise)
Expand All @@ -202,6 +234,7 @@ class WorkerPool<ThreadType extends Thread> implements Pool<ThreadType> {
try {
await this.runPoolTask(worker, task)
} finally {
worker.lastActivity = Date.now()
removeTaskFromWorkersRunningTasks()

if (!this.isClosing) {
Expand All @@ -213,12 +246,35 @@ class WorkerPool<ThreadType extends Thread> implements Pool<ThreadType> {
worker.runningTasks.push(runPromise)
}

private async purgeExpiredWorkers() {
const idleWorkers: WorkerDescriptor<ThreadType>[] = this.workers.filter(
w => w.lastActivity < Date.now() - this.idleTimeoutMillis!
)
const rmWorkers: WorkerDescriptor<ThreadType>[] = []
while (idleWorkers.length && this.workers.length > this.minSize) {
const rmWorker = idleWorkers.shift()!
rmWorkers.push(rmWorker)
const idx = this.workers.findIndex(w => w === rmWorker)
this.workers.splice(idx, 1)
}

Promise.all(rmWorkers.map(
async (w) => Thread.terminate(await w.init)
))
}

private scheduleWork() {
this.debug(`Attempt de-queueing a task in order to run it...`)

const availableWorker = this.findIdlingWorker()
if (!availableWorker) return
let availableWorker = this.findIdlingWorker()
if (!availableWorker) {
if (this.workers.length < this.maxSize) {
[availableWorker] = spawnWorkers(this.spawnWorker, 1)
this.workers.push(availableWorker)
} else {
return
}
}

const nextTask = this.taskQueue.shift()
if (!nextTask) {
Expand Down Expand Up @@ -371,6 +427,10 @@ class WorkerPool<ThreadType extends Thread> implements Pool<ThreadType> {

public async terminate(force?: boolean) {
this.isClosing = true
if (this.idleCleanupHandle) {
clearInterval(this.idleCleanupHandle!)
}

if (!force) {
await this.completed(true)
}
Expand Down