Skip to content

Commit

Permalink
fix(worker): do not reuse a process if it exited since last usage (#135)
Browse files Browse the repository at this point in the history
* fix(engine): undefined pid in error loggin

* fix(worker): do not reuse a process if it exited since last usage

* chore(worker): log warning when a worker dies between 2 trainings
  • Loading branch information
franklevasseur committed Nov 23, 2021
1 parent 985e585 commit 9c3ab74
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 19 deletions.
Expand Up @@ -40,7 +40,7 @@ export class TrainingProcessPool {
throw new TrainingAlreadyStarted()
}
if (errors.isTaskExitedUnexpectedly(err)) {
throw new TrainingExitedUnexpectedly(err.pid, err.info)
throw new TrainingExitedUnexpectedly(err.wid, err.info)
}
throw err
}
Expand Down
5 changes: 0 additions & 5 deletions packages/worker/src/process-pool.ts
@@ -1,6 +1,5 @@
import child_process, { ForkOptions } from 'child_process'
import yn from 'yn'
import { SIG_KILL } from './signals'
import { Logger, PoolOptions, EntryPointOptions } from './typings'
import { WorkerPool } from './worker-pool'
import { Worker } from './worker-pool/worker'
Expand All @@ -26,10 +25,6 @@ export class ProcessPool<I, O> extends WorkerPool<I, O> {
public isMainWorker = () => {
return !yn(process.env.CHILD)
}

public cancel(id: string) {
this._scheduler.cancel(id, (w) => (w.innerWorker.worker as child_process.ChildProcess).kill(SIG_KILL))
}
}

export class ProcessEntyPoint<I, O> extends WorkerEntryPoint<I, O> {
Expand Down
7 changes: 6 additions & 1 deletion packages/worker/src/worker-pool/index.ts
Expand Up @@ -19,12 +19,13 @@ import { Scheduler } from './scheduler'
import { Worker } from './worker'

export abstract class WorkerPool<I, O> implements IWorkerPool<I, O> {
protected _scheduler = new Scheduler(() => this._createNewWorker(), { maxItems: this.config.maxWorkers })
protected _scheduler: Scheduler

private errorHandler: ErrorDeserializer

constructor(protected logger: Logger, private config: PoolOptions) {
this.errorHandler = config.errorHandler ?? new ErrorHandler()
this._scheduler = new Scheduler(() => this._createNewWorker(), this.logger, { maxItems: this.config.maxWorkers })
}

abstract createWorker: (entryPoint: string, env: NodeJS.ProcessEnv) => Promise<Worker>
Expand Down Expand Up @@ -56,6 +57,10 @@ export abstract class WorkerPool<I, O> implements IWorkerPool<I, O> {
return output
}

public cancel(id: string) {
return this._scheduler.cancel(id)
}

private async _startTask(worker: Worker, input: I, progress: (x: number) => void): Promise<O> {
const msg: OutgoingMessage<'start_task', I> = {
type: 'start_task',
Expand Down
41 changes: 29 additions & 12 deletions packages/worker/src/worker-pool/scheduler.ts
@@ -1,19 +1,25 @@
import _ from 'lodash'
import { Logger } from '../typings'
import { Worker } from './worker'

interface Options {
maxItems: number
}

type Generator<T> = () => Promise<T>
type Generator = () => Promise<Worker>

type ItemCallback = (item: Worker) => void

type ItemCallback<T> = (item: T) => void
export class Scheduler {
private ready: Worker[] = []
private active: { [id: string]: Worker } = {}
private waiting: ItemCallback[] = []

export class Scheduler<T> {
private ready: T[] = []
private active: { [id: string]: T } = {}
private waiting: ItemCallback<T>[] = []
constructor(private _generator: Generator, private logger: Logger, private _options: Options) {}

constructor(private _generator: Generator<T>, private _options: Options) {}
public async getNext(id: string): Promise<Worker> {
this.ready = this._filterOutDeadWorkers(this.ready)

public async getNext(id: string): Promise<T> {
const readyCount = this.ready.length
const activeCount = Object.values(this.active).length
const totalCount = readyCount + activeCount
Expand All @@ -27,24 +33,35 @@ export class Scheduler<T> {
const isPlaceLeft = this._options.maxItems < 0 || this._options.maxItems > totalCount
if (!readyCount && isPlaceLeft) {
const newItem = await this._generator()
newItem.isAlive()
this.active[id] = newItem
return newItem
}

return new Promise((resolve) => {
this.waiting.push((item: T) => {
this.waiting.push((item: Worker) => {
this.active[id] = item
resolve(item)
})
})
}

public cancel(id: string, cancel: (item: T) => void): void {
private _filterOutDeadWorkers = (workers: Worker[]): Worker[] => {
const [alive, dead] = _.partition(workers, (w) => w.isAlive())
if (dead.length) {
const formattedDeads = dead.map((w) => w.wid).join(', ')
this.logger.warning(`The following workers have died since last usage: [${formattedDeads}]`)
}
return alive
}

public cancel(id: string): void {
const item = this.active[id]
if (!item) {
return
}
cancel(item)

item.cancel()

delete this.active[id]
}
Expand All @@ -53,7 +70,7 @@ export class Scheduler<T> {
return !!this.active[id]
}

public releaseItem(id: string, item: T) {
public releaseItem(id: string, item: Worker) {
delete this.active[id]

if (!this.waiting.length) {
Expand Down
17 changes: 17 additions & 0 deletions packages/worker/src/worker-pool/worker.ts
@@ -1,5 +1,6 @@
import { ChildProcess } from 'child_process'
import { Worker as Thread } from 'worker_threads'
import { SIG_KILL } from '../signals'

type InnerWorker =
| {
Expand Down Expand Up @@ -30,6 +31,22 @@ export class Worker {
this._innerWorker.worker.send(msg)
}

public cancel() {
if (this._innerWorker.type === 'thread') {
// not implemented for threads
return
}
return (this.innerWorker.worker as ChildProcess).kill(SIG_KILL)
}

public isAlive(): boolean {
if (this._innerWorker.type === 'thread') {
// currently no way of telling if thread exited
return true
}
return this._innerWorker.worker.connected
}

public get wid() {
if (this._innerWorker.type === 'thread') {
return this._innerWorker.worker.threadId
Expand Down

0 comments on commit 9c3ab74

Please sign in to comment.