Skip to content

Commit

Permalink
Merge pull request #2293 from botpress/ya-mt
Browse files Browse the repository at this point in the history
fix(core): added multi-thread support for heavy tasks
  • Loading branch information
allardy committed Sep 10, 2019
2 parents 17e7eab + 1f654e9 commit d2b30e4
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 28 deletions.
7 changes: 7 additions & 0 deletions api.rest
Expand Up @@ -32,6 +32,13 @@ text=hello how are you today
&reftime={{$datetime "xx"}}
&tz=America/Toronto


### Reboot server
POST {{baseUrl}}/api/v1/admin/server/rebootServer
Authorization: Bearer {{authToken}}
X-BP-Workspace: default


### Test Lang Server
GET https://lang-01.botpress.io/info

Expand Down
20 changes: 10 additions & 10 deletions modules/nlu/src/backend/pipelines/intents/svm_classifier.ts
Expand Up @@ -93,6 +93,9 @@ export default class SVMClassifier {
}

public async train(intentDefs: sdk.NLU.IntentDefinition[], modelHash: string): Promise<Model[]> {
const svmOptions: Partial<sdk.MLToolkit.SVM.SVMOptions> = { kernel: 'LINEAR', classifier: 'C_SVC' }
const svm = new this.toolkit.SVM.Trainer()

this.realtime.sendPayload(
this.realtimePayload.forAdmins('statusbar.event', getProgressPayload(identityProgress)(0.1))
)
Expand Down Expand Up @@ -204,28 +207,25 @@ export default class SVMClassifier {
}
}

const svm = new this.toolkit.SVM.Trainer({ kernel: 'LINEAR', classifier: 'C_SVC' })

const ratioedProgressForIndex = ratioedProgress(index)

await svm.train(l1Points, progress => {
const updateProgress = progress => {
this.realtime.sendPayload(
this.realtimePayload.forAdmins('statusbar.event', getProgressPayload(ratioedProgressForIndex)(progress))
)
debugTrain('SVM => progress for INT', { context, progress })
})
}

const modelStr = svm.serialize()
const ratioedProgressForIndex = ratioedProgress(index)
const modelStr = await svm.train(l1Points, svmOptions, updateProgress)

models.push({
meta: { context, created_on: Date.now(), hash: modelHash, scope: 'bot', type: 'intent-l1' },
model: new Buffer(modelStr, 'utf8')
})
}

const svm = new this.toolkit.SVM.Trainer({ kernel: 'LINEAR', classifier: 'C_SVC' })
await svm.train(l0Points, progress => debugTrain('SVM => progress for CTX %d', progress))
const ctxModelStr = svm.serialize()
const ctxModelStr = await svm.train(l0Points, svmOptions, progress =>
debugTrain('SVM => progress for CTX %d', progress)
)

this.l1Tfidf = _.mapValues(l1Tfidf, x => x['__avg__'])
this.l0Tfidf = l0Tfidf['__avg__']
Expand Down
7 changes: 7 additions & 0 deletions src/bp/bootstrap.ts
Expand Up @@ -6,13 +6,15 @@ import './common/polyfills'

import sdk from 'botpress/sdk'
import chalk from 'chalk'
import cluster from 'cluster'
import { Botpress, Config, Db, Ghost, Logger } from 'core/app'
import center from 'core/logger/center'
import { ModuleLoader } from 'core/module-loader'
import ModuleResolver from 'core/modules/resolver'
import fs from 'fs'
import os from 'os'

import { setupMasterNode } from './cluster'
import { FatalError } from './errors'

async function setupEnv() {
Expand All @@ -23,6 +25,11 @@ async function setupEnv() {
}

async function start() {
if (cluster.isMaster) {
// The master process only needs getos and rewire
return setupMasterNode(await Logger('Cluster'))
}

await setupEnv()

const logger = await Logger('Launcher')
Expand Down
52 changes: 52 additions & 0 deletions src/bp/cluster.ts
@@ -0,0 +1,52 @@
import sdk from 'botpress/sdk'
import cluster from 'cluster'
import yn from 'yn'

const debug = DEBUG('cluster')

const msgHandlers: { [messageType: string]: (message: any, worker: cluster.Worker) => void } = {}

/**
* The master process handles training and rebooting the server.
* The worker process runs the actual server
*
* Exit code 0: Success (not respawn workers)
* Exit code 1: Error (will try to respawn workers)
*/
export const registerMsgHandler = (messageType: string, handler: (message: any, worker: cluster.Worker) => void) => {
msgHandlers[messageType] = handler
}

export const setupMasterNode = (logger: sdk.Logger) => {
registerMsgHandler('reboot_server', (message, worker) => {
logger.warn(`Restarting server...`)
worker.disconnect()
worker.kill()
})

cluster.on('exit', (worker, code, signal) => {
debug(`Process exiting %o`, { workerId: worker.id, code, signal })
if (code === 0) {
process.exit(0)
}

if (!yn(process.core_env.BP_DISABLE_AUTO_RESTART)) {
cluster.fork()
}
})

cluster.on('message', (worker: cluster.Worker, message: any) => {
const handler = msgHandlers[message.type]
if (!handler) {
return logger.error(`No handler configured for ${message.type}`)
}

try {
handler(message, worker)
} catch (err) {
logger.attachError(err).error(`Error while processing worker message ${message.type}`)
}
})

cluster.fork()
}
10 changes: 1 addition & 9 deletions src/bp/core/routers/admin/server.ts
Expand Up @@ -86,15 +86,7 @@ export class ServerRouter extends CustomRouter {

res.sendStatus(200)

// Timeout is only to allow the response to reach the asking user
setTimeout(() => {
spawn(process.argv[0], process.argv.slice(1), {
detached: true,
stdio: 'inherit'
}).unref()

process.exit()
}, 100)
process.send && process.send({ type: 'reboot_server' })
})
)

Expand Down
4 changes: 2 additions & 2 deletions src/bp/core/services/migration/index.ts
Expand Up @@ -68,7 +68,7 @@ export class MigrationService {
await this.logger.error(
`Botpress needs to migrate your data. Please make a copy of your data, then start it with "./bp --auto-migrate"`
)
process.exit(1)
process.exit(0)
}

await this.executeMigrations(missingMigrations)
Expand Down Expand Up @@ -146,7 +146,7 @@ export class MigrationService {
await this.logger.error(
`Some steps failed to complete. Please fix errors manually, then restart Botpress so the update process may finish.`
)
process.exit(1)
process.exit(0)
}

await this.updateAllVersions()
Expand Down
4 changes: 2 additions & 2 deletions src/bp/ml/svm.test.ts
Expand Up @@ -13,8 +13,8 @@ test('Trainer', async () => {
{ coordinates: [1, 1], label: 'B' }
]

const trainer = new Trainer({ classifier: 'C_SVC', kernel: 'LINEAR', c: 1 })
await trainer.train(line)
const trainer = new Trainer()
await trainer.train(line, { classifier: 'C_SVC', kernel: 'LINEAR', c: 1 })

const predictor = new Predictor(trainer.serialize())

Expand Down
14 changes: 12 additions & 2 deletions src/bp/ml/svm.ts
Expand Up @@ -16,8 +16,15 @@ export class Trainer implements sdk.MLToolkit.SVM.Trainer {
private model?: any
private report?: any

constructor(options: Partial<sdk.MLToolkit.SVM.SVMOptions> = DefaultTrainArgs) {
constructor() {}

async train(
points: sdk.MLToolkit.SVM.DataPoint[],
options: Partial<sdk.MLToolkit.SVM.SVMOptions> = DefaultTrainArgs,
callback?: sdk.MLToolkit.SVM.TrainProgressCallback | undefined
): Promise<string> {
const args = { ...DefaultTrainArgs, ...options }

this.clf = new binding.SVM({
svmType: args.classifier,
kernelType: args.kernel,
Expand All @@ -27,9 +34,12 @@ export class Trainer implements sdk.MLToolkit.SVM.Trainer {
probability: true,
kFold: 4
})

await this._train(points, callback)
return this.serialize()
}

async train(
private async _train(
points: sdk.MLToolkit.SVM.DataPoint[],
callback?: sdk.MLToolkit.SVM.TrainProgressCallback | undefined
): Promise<any> {
Expand Down
37 changes: 37 additions & 0 deletions src/bp/ml/toolkit.ts
@@ -1,6 +1,8 @@
import * as sdk from 'botpress/sdk'
import cluster from 'cluster'
import kmeans from 'ml-kmeans'

import { registerMsgHandler } from '../cluster'
const { Tagger, Trainer: CRFTrainer } = require('./crfsuite')
import { FastTextModel } from './fasttext'
import computeJaroWinklerDistance from './homebrew/jaro-winkler'
Expand All @@ -25,4 +27,39 @@ const MLToolkit: typeof sdk.MLToolkit = {
SentencePiece: { createProcessor: processor }
}

if (cluster.isWorker) {
MLToolkit.SVM.Trainer.prototype.train = (
points: sdk.MLToolkit.SVM.DataPoint[],
options?: Partial<sdk.MLToolkit.SVM.SVMOptions>,
progressCb?: sdk.MLToolkit.SVM.TrainProgressCallback | undefined
): any => {
return Promise.fromCallback(completedCb => {
const messageHandler = msg => {
if (progressCb && msg.type === 'progress') {
progressCb(msg.progress)
}

if (msg.type === 'svm_trained') {
process.off('message', messageHandler)
completedCb(undefined, msg.result)
}
}

process.send!({ type: 'svm_train', points, options })
process.on('message', messageHandler)
})
}
}

if (cluster.isMaster) {
registerMsgHandler('svm_train', async (msg, worker) => {
const sendToWorker = event => worker.isConnected() && worker.send(event)

const svm = new SVMTrainer()
const result = await svm.train(msg.points, msg.options, progress => sendToWorker({ type: 'progress', progress }))

sendToWorker({ type: 'svm_trained', result })
})
}

export default MLToolkit
5 changes: 2 additions & 3 deletions src/bp/sdk/botpress.d.ts
Expand Up @@ -301,10 +301,9 @@ declare module 'botpress/sdk' {
}

export class Trainer {
constructor(options?: Partial<SVMOptions>)
train(points: DataPoint[], callback?: TrainProgressCallback): Promise<void>
constructor()
train(points: DataPoint[], options?: Partial<SVMOptions>, callback?: TrainProgressCallback): Promise<string>
isTrained(): boolean
serialize(): string
}

export class Predictor {
Expand Down
6 changes: 6 additions & 0 deletions src/typings/global.d.ts
Expand Up @@ -95,6 +95,12 @@ declare type BotpressEnvironementVariables = {
* Defaults to '1gb'
*/
readonly BP_MAX_MEMORY_CACHE_SIZE?: string

/**
* When set to true, Botpress will not automatically restart on crash
* @default false
*/
readonly BP_DISABLE_AUTO_RESTART?: boolean
}

interface IDebug {
Expand Down

0 comments on commit d2b30e4

Please sign in to comment.