From fab0c80f19087ed1c2c5c0754f1cf2895acdb994 Mon Sep 17 00:00:00 2001 From: hyj1991 Date: Mon, 6 Jun 2022 10:44:30 +0800 Subject: [PATCH] feat: support options startMode --- .eslintrc | 7 +- .github/workflows/nodejs.yml | 8 +- lib/agent_worker.js | 15 +- lib/app_worker.js | 17 +- lib/master.js | 294 ++++++------------ lib/utils/manager.js | 14 +- lib/utils/messenger.js | 24 +- lib/utils/mode/base/agent.js | 79 +++++ lib/utils/mode/base/app.js | 101 ++++++ lib/utils/mode/impl/process/agent.js | 117 +++++++ lib/utils/mode/impl/process/app.js | 151 +++++++++ lib/utils/mode/impl/worker_threads/agent.js | 100 ++++++ lib/utils/mode/impl/worker_threads/app.js | 5 + lib/utils/options.js | 1 + lib/utils/terminate.js | 16 +- package.json | 9 +- test/agent_worker.test.js | 4 +- test/app_worker.test.js | 4 +- .../apps/agent-worker-threads-error/agent.js | 6 + .../agent-worker-threads-error/package.json | 3 + .../apps/agent-worker-threads/agent.js | 7 + .../apps/agent-worker-threads/package.json | 3 + .../apps/script-start/start-server.js | 9 +- test/worker_threads.test.js | 31 ++ 24 files changed, 763 insertions(+), 262 deletions(-) create mode 100644 lib/utils/mode/base/agent.js create mode 100644 lib/utils/mode/base/app.js create mode 100644 lib/utils/mode/impl/process/agent.js create mode 100644 lib/utils/mode/impl/process/app.js create mode 100644 lib/utils/mode/impl/worker_threads/agent.js create mode 100644 lib/utils/mode/impl/worker_threads/app.js create mode 100644 test/fixtures/apps/agent-worker-threads-error/agent.js create mode 100644 test/fixtures/apps/agent-worker-threads-error/package.json create mode 100644 test/fixtures/apps/agent-worker-threads/agent.js create mode 100644 test/fixtures/apps/agent-worker-threads/package.json create mode 100644 test/worker_threads.test.js diff --git a/.eslintrc b/.eslintrc index c799fe5..f8bdcf3 100644 --- a/.eslintrc +++ b/.eslintrc @@ -1,3 +1,6 @@ { - "extends": "eslint-config-egg" -} + "extends": "eslint-config-egg", + "parserOptions": { + "ecmaVersion": 13 + } +} \ No newline at end of file diff --git a/.github/workflows/nodejs.yml b/.github/workflows/nodejs.yml index c6afbad..4df7840 100644 --- a/.github/workflows/nodejs.yml +++ b/.github/workflows/nodejs.yml @@ -22,7 +22,7 @@ jobs: strategy: fail-fast: false matrix: - node-version: [12, 14, 16] + node-version: [14, 16] os: [ubuntu-latest, macos-latest] steps: @@ -30,17 +30,17 @@ jobs: uses: actions/checkout@v2 - name: Use Node.js ${{ matrix.node-version }} - uses: actions/setup-node@v1 + uses: actions/setup-node@v3 with: node-version: ${{ matrix.node-version }} - name: Install Dependencies - run: npm i -g npminstall && npminstall + run: npm i - name: Continuous Integration run: npm run ci - name: Code Coverage - uses: codecov/codecov-action@v1 + uses: codecov/codecov-action@v3 with: token: ${{ secrets.CODECOV_TOKEN }} diff --git a/lib/agent_worker.js b/lib/agent_worker.js index 81b5f5c..5d5fcb5 100644 --- a/lib/agent_worker.js +++ b/lib/agent_worker.js @@ -17,8 +17,14 @@ if (options.require) { }); } +let AgentWorker; +if (options.startMode === 'worker_threads') { + AgentWorker = require('./utils/mode/impl/worker_threads/agent').AgentWorker; +} else { + AgentWorker = require('./utils/mode/impl/process/agent').AgentWorker; +} + const debug = require('debug')('egg-cluster'); -const gracefulExit = require('graceful-process'); const ConsoleLogger = require('egg-logger').EggConsoleLogger; const consoleLogger = new ConsoleLogger({ level: process.env.EGG_AGENT_WORKER_LOGGER_LEVEL }); @@ -35,8 +41,7 @@ try { function startErrorHandler(err) { consoleLogger.error(err); consoleLogger.error('[agent_worker] start error, exiting with code:1'); - process.exitCode = 1; - process.kill(process.pid); + AgentWorker.kill(); } agent.ready(err => { @@ -44,13 +49,13 @@ agent.ready(err => { if (err) return; agent.removeListener('error', startErrorHandler); - process.send({ action: 'agent-start', to: 'master' }); + AgentWorker.send({ action: 'agent-start', to: 'master' }); }); // exit if agent start error agent.once('error', startErrorHandler); -gracefulExit({ +AgentWorker.gracefulExit({ logger: consoleLogger, label: 'agent_worker', beforeExit: () => agent.close(), diff --git a/lib/app_worker.js b/lib/app_worker.js index 4d3305a..88aae9e 100644 --- a/lib/app_worker.js +++ b/lib/app_worker.js @@ -9,9 +9,15 @@ if (options.require) { }); } +let AppWorker; +if (options.startMode === 'worker_threads') { + AppWorker = require('./utils/mode/impl/worker_threads/app').AppWorker; +} else { + AppWorker = require('./utils/mode/impl/process/app').AppWorker; +} + const fs = require('fs'); const debug = require('debug')('egg-cluster'); -const gracefulExit = require('graceful-process'); const ConsoleLogger = require('egg-logger').EggConsoleLogger; const consoleLogger = new ConsoleLogger({ level: process.env.EGG_APP_WORKER_LOGGER_LEVEL, @@ -31,7 +37,7 @@ const httpsOptions = Object.assign({}, clusterConfig.https, options.https); const port = options.port = options.port || listenConfig.port; const protocol = (httpsOptions.key && httpsOptions.cert) ? 'https' : 'http'; -process.send({ +AppWorker.send({ to: 'master', action: 'realport', data: { @@ -44,8 +50,7 @@ app.ready(startServer); function exitProcess() { // Use SIGTERM kill process, ensure trigger the gracefulExit - process.exitCode = 1; - process.kill(process.pid); + AppWorker.kill(); } // exit if worker start timeout @@ -89,7 +94,7 @@ function startServer(err) { if (options.sticky) { server.listen(options.stickyWorkerPort, '127.0.0.1'); // Listen to messages sent from the master. Ignore everything else. - process.on('message', (message, connection) => { + AppWorker.on('message', (message, connection) => { if (message !== 'sticky-session:connection') { return; } @@ -116,7 +121,7 @@ function startServer(err) { } } -gracefulExit({ +AppWorker.gracefulExit({ logger: consoleLogger, label: 'app_worker', beforeExit: () => app.close(), diff --git a/lib/master.js b/lib/master.js index cb81d98..280f81d 100644 --- a/lib/master.js +++ b/lib/master.js @@ -7,20 +7,19 @@ const path = require('path'); const fs = require('fs'); const cluster = require('cluster'); const EventEmitter = require('events'); -const childprocess = require('child_process'); -const cfork = require('cfork'); const ready = require('get-ready'); const GetFreePort = require('detect-port'); const ConsoleLogger = require('egg-logger').EggConsoleLogger; const utility = require('utility'); -const semver = require('semver'); -const co = require('co'); const { mkdirp } = require('mz-modules'); const Manager = require('./utils/manager'); const parseOptions = require('./utils/options'); const Messenger = require('./utils/messenger'); -const terminate = require('./utils/terminate'); +const { AgentUtils: ProcessAgentWorker } = require('./utils/mode/impl/process/agent'); +const { AppUtils: ProcessAppWorker } = require('./utils/mode/impl/process/app'); +const { AgentUtils: WorkerThreadsAgentWorker } = require('./utils/mode/impl/worker_threads/agent'); +const { AppUtils: WorkerThreadsAppWorker } = require('./utils/mode/impl/worker_threads/app'); const PROTOCOL = Symbol('Master#protocol'); const REAL_PORT = Symbol('Master#real_port'); @@ -44,7 +43,7 @@ class Master extends EventEmitter { super(); this.options = parseOptions(options); this.workerManager = new Manager(); - this.messenger = new Messenger(this); + this.messenger = new Messenger(this, this.workerManager); ready.mixin(this); @@ -66,6 +65,13 @@ class Master extends EventEmitter { const frameworkPath = this.options.framework; const frameworkPkg = utility.readJSONSync(path.join(frameworkPath, 'package.json')); + // set app & agent worker impl + if (this.options.startMode === 'worker_threads') { + this.startByWorkerThreads(); + } else { + this.startByProcess(); + } + this.log(`[master] =================== ${frameworkPkg.name} start =====================`); this.logger.info(`[master] node version ${process.version}`); /* istanbul ignore next */ @@ -170,6 +176,36 @@ class Master extends EventEmitter { }); } + startByProcess() { + this.agentWorker = new ProcessAgentWorker(this.options, { + log: this.log.bind(this), + logger: this.logger, + messenger: this.messenger, + }); + + this.appWorker = new ProcessAppWorker(this.options, { + log: this.log.bind(this), + logger: this.logger, + messenger: this.messenger, + isProduction: this.isProduction, + }); + } + + startByWorkerThreads() { + this.agentWorker = new WorkerThreadsAgentWorker(this.options, { + log: this.log.bind(this), + logger: this.logger, + messenger: this.messenger, + }); + + this.appWorker = new WorkerThreadsAppWorker(this.options, { + log: this.log.bind(this), + logger: this.logger, + messenger: this.messenger, + isProduction: this.isProduction, + }); + } + detectPorts() { // Detect cluster client port return GetFreePort() @@ -196,10 +232,6 @@ class Master extends EventEmitter { this.logger[this.logMethod](...args); } - get agentWorker() { - return this.workerManager.agent; - } - startMasterSocketServer(cb) { // Create the outside facing server listening on our port. require('net').createServer({ @@ -217,7 +249,7 @@ class Master extends EventEmitter { connection.destroy(); } else { const worker = this.stickyWorker(connection.remoteAddress); - worker.send('sticky-session:connection', connection); + worker.instance.send('sticky-session:connection', connection); } }).listen(this[REAL_PORT], cb); } @@ -238,141 +270,13 @@ class Master extends EventEmitter { } forkAgentWorker() { - this.agentStartTime = Date.now(); - - const args = [ JSON.stringify(this.options) ]; - const opt = {}; - - if (process.platform === 'win32') opt.windowsHide = true; - - // add debug execArgv - const debugPort = process.env.EGG_AGENT_DEBUG_PORT || 5800; - if (this.options.isDebug) opt.execArgv = process.execArgv.concat([ `--${semver.gte(process.version, '8.0.0') ? 'inspect' : 'debug'}-port=${debugPort}` ]); - - const agentWorker = childprocess.fork(this.getAgentWorkerFile(), args, opt); - agentWorker.status = 'starting'; - agentWorker.id = ++this.agentWorkerIndex; - this.workerManager.setAgent(agentWorker); - this.log('[master] agent_worker#%s:%s start with clusterPort:%s', - agentWorker.id, agentWorker.pid, this.options.clusterPort); - - // send debug message - if (this.options.isDebug) { - this.messenger.send({ - to: 'parent', - from: 'agent', - action: 'debug', - data: { - debugPort, - pid: agentWorker.pid, - }, - }); - } - // forwarding agent' message to messenger - agentWorker.on('message', msg => { - if (typeof msg === 'string') { - msg = { - action: msg, - data: msg, - }; - } - msg.from = 'agent'; - this.messenger.send(msg); - }); - agentWorker.on('error', err => { - err.name = 'AgentWorkerError'; - err.id = agentWorker.id; - err.pid = agentWorker.pid; - this.logger.error(err); - }); - // agent exit message - agentWorker.once('exit', (code, signal) => { - this.messenger.send({ - action: 'agent-exit', - data: { - code, - signal, - }, - to: 'master', - from: 'agent', - }); - }); + this.agentWorker.on('agent_forked', agent => this.workerManager.setAgent(agent)); + this.agentWorker.fork(); } forkAppWorkers() { - this.appStartTime = Date.now(); - this.isAllAppWorkerStarted = false; - this.startSuccessCount = 0; - const args = [ JSON.stringify(this.options) ]; - this.log('[master] start appWorker with args %j', args); - cfork({ - exec: this.getAppWorkerFile(), - args, - silent: false, - count: this.options.workers, - // don't refork in local env - refork: this.isProduction, - windowsHide: process.platform === 'win32', - }); - - let debugPort = process.debugPort; - cluster.on('fork', worker => { - worker.disableRefork = true; - this.workerManager.setWorker(worker); - worker.on('message', msg => { - if (typeof msg === 'string') { - msg = { - action: msg, - data: msg, - }; - } - msg.from = 'app'; - this.messenger.send(msg); - }); - this.log('[master] app_worker#%s:%s start, state: %s, current workers: %j', - worker.id, worker.process.pid, worker.state, Object.keys(cluster.workers)); - - // send debug message, due to `brk` scence, send here instead of app_worker.js - if (this.options.isDebug) { - debugPort++; - this.messenger.send({ - to: 'parent', - from: 'app', - action: 'debug', - data: { - debugPort, - pid: worker.process.pid, - }, - }); - } - }); - cluster.on('disconnect', worker => { - this.logger.info('[master] app_worker#%s:%s disconnect, suicide: %s, state: %s, current workers: %j', - worker.id, worker.process.pid, worker.exitedAfterDisconnect, worker.state, Object.keys(cluster.workers)); - }); - cluster.on('exit', (worker, code, signal) => { - this.messenger.send({ - action: 'app-exit', - data: { - workerPid: worker.process.pid, - code, - signal, - }, - to: 'master', - from: 'app', - }); - }); - cluster.on('listening', (worker, address) => { - this.messenger.send({ - action: 'app-start', - data: { - workerPid: worker.process.pid, - address, - }, - to: 'master', - from: 'app', - }); - }); + this.appWorker.on('worker_forked', worker => this.workerManager.setWorker(worker)); + this.appWorker.fork(); } /** @@ -384,25 +288,12 @@ class Master extends EventEmitter { * @param {number} timeout - kill agent timeout * @return {Promise} - */ - killAgentWorker(timeout) { - const agentWorker = this.agentWorker; - if (agentWorker) { - this.log('[master] kill agent worker with signal SIGTERM'); - agentWorker.removeAllListeners(); - } - return co(function* () { - yield terminate(agentWorker, timeout); - }); + async killAgentWorker(timeout) { + await this.agentWorker.kill(timeout); } - killAppWorkers(timeout) { - return co(function* () { - yield Object.keys(cluster.workers).map(id => { - const worker = cluster.workers[id]; - worker.disableRefork = true; - return terminate(worker, timeout); - }); - }); + async killAppWorkers(timeout) { + await this.appWorker.kill(timeout); } /** @@ -421,15 +312,15 @@ class Master extends EventEmitter { data: [], }); const agentWorker = this.agentWorker; - this.workerManager.deleteAgent(this.agentWorker); + this.workerManager.deleteAgent(agentWorker); const err = new Error(util.format('[master] agent_worker#%s:%s died (code: %s, signal: %s)', - agentWorker.id, agentWorker.pid, data.code, data.signal)); + agentWorker.instance.id, agentWorker.instance.workerId, data.code, data.signal)); err.name = 'AgentWorkerDiedError'; this.logger.error(err); // remove all listeners to avoid memory leak - agentWorker.removeAllListeners(); + agentWorker.clean(); if (this.isStarted) { this.log('[master] try to start a new agent_worker after 1s ...'); @@ -443,16 +334,16 @@ class Master extends EventEmitter { }); } else { this.logger.error('[master] agent_worker#%s:%s start fail, exiting with code:1', - agentWorker.id, agentWorker.pid); + agentWorker.instance.id, agentWorker.instance.workerId); process.exit(1); } } onAgentStart() { - this.agentWorker.status = 'started'; + this.agentWorker.instance.status = 'started'; // Send egg-ready when agent is started after launched - if (this.isAllAppWorkerStarted) { + if (this.appWorker.isAllWorkerStarted) { this.messenger.send({ action: 'egg-ready', to: 'agent', @@ -463,7 +354,7 @@ class Master extends EventEmitter { this.messenger.send({ action: 'egg-pids', to: 'app', - data: [ this.agentWorker.pid ], + data: [ this.agentWorker.instance.workerId ], }); // should send current worker pids when agent restart if (this.isStarted) { @@ -479,26 +370,26 @@ class Master extends EventEmitter { to: 'app', }); this.logger.info('[master] agent_worker#%s:%s started (%sms)', - this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime); + this.agentWorker.instance.id, this.agentWorker.instance.workerId, Date.now() - this.agentWorker.startTime); } /** * App Worker exit handler * @param {Object} data - * - {String} workerPid - worker id + * - {String} workerId - worker id * - {Number} code - exit code * - {String} signal - received signal */ onAppExit(data) { if (this.closed) return; - const worker = this.workerManager.getWorker(data.workerPid); + const worker = this.workerManager.getWorker(data.workerId); if (!worker.isDevReload) { const signal = data.signal; const message = util.format( '[master] app_worker#%s:%s died (code: %s, signal: %s, suicide: %s, state: %s), current workers: %j', - worker.id, worker.process.pid, worker.process.exitCode, signal, + worker.id, worker.workerId, worker.exitCode, signal, worker.exitedAfterDisconnect, worker.state, Object.keys(cluster.workers) ); @@ -515,8 +406,8 @@ class Master extends EventEmitter { } // remove all listeners to avoid memory leak - worker.removeAllListeners(); - this.workerManager.deleteWorker(data.workerPid); + worker.clean(); + this.workerManager.deleteWorker(data.workerId); // send message to agent with alive workers this.messenger.send({ action: 'egg-pids', @@ -524,7 +415,7 @@ class Master extends EventEmitter { data: this.workerManager.getListeningWorkerIds(), }); - if (this.isAllAppWorkerStarted) { + if (this.appWorker.isAllWorkerStarted) { // cfork will only refork at production mode this.messenger.send({ action: 'app-worker-died', @@ -534,7 +425,7 @@ class Master extends EventEmitter { } else { // exit if died during startup this.logger.error('[master] app_worker#%s:%s start fail, exiting with code:1', - worker.id, worker.process.pid); + worker.id, worker.workerId); process.exit(1); } } @@ -542,11 +433,11 @@ class Master extends EventEmitter { /** * after app worker * @param {Object} data - * - {String} workerPid - worker id + * - {String} workerId - worker id * - {Object} address - server address */ onAppStart(data) { - const worker = this.workerManager.getWorker(data.workerPid); + const worker = this.workerManager.getWorker(data.workerId); const address = data.address; // worker should listen stickyWorkerPort when sticky mode @@ -567,14 +458,14 @@ class Master extends EventEmitter { data: this.workerManager.getListeningWorkerIds(), }); - this.startSuccessCount++; + this.appWorker.startSuccessCount++; - const remain = this.isAllAppWorkerStarted ? 0 : this.options.workers - this.startSuccessCount; + const remain = this.appWorker.isAllWorkerStarted ? 0 : this.options.workers - this.appWorker.startSuccessCount; this.log('[master] app_worker#%s:%s started at %s, remain %s (%sms)', - worker.id, data.workerPid, address.port, remain, Date.now() - this.appStartTime); + worker.id, worker.workerId, address.port, remain, Date.now() - this.appWorker.startTime); // Send egg-ready when app is started after launched - if (this.isAllAppWorkerStarted) { + if (this.appWorker.isAllWorkerStarted) { this.messenger.send({ action: 'egg-ready', to: 'app', @@ -583,15 +474,15 @@ class Master extends EventEmitter { } // if app is started, it should enable this worker - if (this.isAllAppWorkerStarted) { + if (this.appWorker.isAllWorkerStarted) { worker.disableRefork = false; } - if (this.isAllAppWorkerStarted || this.startSuccessCount < this.options.workers) { + if (this.appWorker.isAllWorkerStarted || this.appWorker.startSuccessCount < this.options.workers) { return; } - this.isAllAppWorkerStarted = true; + this.appWorker.isAllWorkerStarted = true; // enable all workers when app started for (const id in cluster.workers) { @@ -657,30 +548,19 @@ class Master extends EventEmitter { require('cluster-reload')(this.options.workers); } - close() { + async close() { this.closed = true; - const self = this; - co(function* () { - try { - yield self._doClose(); - self.log('[master] close done, exiting with code:0'); - process.exit(0); - } catch (e) /* istanbul ignore next */ { - this.logger.error('[master] close with error: ', e); - process.exit(1); - } - }); - } - - getAgentWorkerFile() { - return path.join(__dirname, 'agent_worker.js'); - } - - getAppWorkerFile() { - return path.join(__dirname, 'app_worker.js'); + try { + await this._doClose(); + this.log('[master] close done, exiting with code:0'); + process.exit(0); + } catch (e) /* istanbul ignore next */ { + this.logger.error('[master] close with error: ', e); + process.exit(1); + } } - * _doClose() { + async _doClose() { // kill app workers // kill agent worker // exit itself @@ -690,14 +570,14 @@ class Master extends EventEmitter { this.logger.info('[master] send kill SIGTERM to app workers, will exit with code:0 after %sms', appTimeout); this.logger.info('[master] wait %sms', appTimeout); try { - yield this.killAppWorkers(appTimeout); + await this.killAppWorkers(appTimeout); } catch (e) /* istanbul ignore next */ { this.logger.error('[master] app workers exit error: ', e); } this.logger.info('[master] send kill SIGTERM to agent worker, will exit with code:0 after %sms', agentTimeout); this.logger.info('[master] wait %sms', agentTimeout); try { - yield this.killAgentWorker(agentTimeout); + await this.killAgentWorker(agentTimeout); } catch (e) /* istanbul ignore next */ { this.logger.error('[master] agent worker exit error: ', e); } diff --git a/lib/utils/manager.js b/lib/utils/manager.js index d931ec5..3bb38f8 100644 --- a/lib/utils/manager.js +++ b/lib/utils/manager.js @@ -15,20 +15,24 @@ class Manager extends EventEmitter { this.agent = agent; } + getAgent() { + return this.agent; + } + deleteAgent() { this.agent = null; } setWorker(worker) { - this.workers.set(worker.process.pid, worker); + this.workers.set(worker.workerId, worker); } - getWorker(pid) { - return this.workers.get(pid); + getWorker(workerId) { + return this.workers.get(workerId); } - deleteWorker(pid) { - this.workers.delete(pid); + deleteWorker(workerId) { + this.workers.delete(workerId); } listWorkerIds() { diff --git a/lib/utils/messenger.js b/lib/utils/messenger.js index 9019f23..7c95b4e 100644 --- a/lib/utils/messenger.js +++ b/lib/utils/messenger.js @@ -1,8 +1,7 @@ 'use strict'; -const cluster = require('cluster'); -const sendmessage = require('sendmessage'); const debug = require('debug')('egg-cluster:messenger'); +const workerThreads = require('worker_threads'); /** @@ -53,9 +52,10 @@ const debug = require('debug')('egg-cluster:messenger'); */ class Messenger { - constructor(master) { + constructor(master, workerManager) { this.master = master; - this.hasParent = !!process.send; + this.workerManager = workerManager; + this.hasParent = !!workerThreads.parentPort || !!process.send; process.on('message', msg => { msg.from = 'parent'; this.send(msg); @@ -80,7 +80,7 @@ class Messenger { if (data.receiverPid) { if (data.receiverPid === String(process.pid)) { data.to = 'master'; - } else if (data.receiverPid === String(this.master.agentWorker.pid)) { + } else if (data.receiverPid === String(this.workerManager.getAgent().workerId)) { data.to = 'agent'; } else { data.to = 'app'; @@ -153,16 +153,17 @@ class Messenger { * @param {Object} data message body */ sendToAppWorker(data) { - for (const id in cluster.workers) { - const worker = cluster.workers[id]; + const workerManager = this.workerManager; + for (const id of workerManager.listWorkerIds()) { + const worker = workerManager.getWorker(id); if (worker.state === 'disconnected') { continue; } // check receiverPid - if (data.receiverPid && data.receiverPid !== String(worker.process.pid)) { + if (data.receiverPid && data.receiverPid !== String(worker.workerId)) { continue; } - sendmessage(worker, data); + worker.send(data); } } @@ -171,8 +172,9 @@ class Messenger { * @param {Object} data message body */ sendToAgentWorker(data) { - if (this.master.agentWorker) { - sendmessage(this.master.agentWorker, data); + const agent = this.workerManager.getAgent(); + if (agent) { + agent.send(data); } } diff --git a/lib/utils/mode/base/agent.js b/lib/utils/mode/base/agent.js new file mode 100644 index 0000000..8e95499 --- /dev/null +++ b/lib/utils/mode/base/agent.js @@ -0,0 +1,79 @@ +/* istanbul ignore file */ +'use strict'; + +const path = require('path'); +const EventEmitter = require('events').EventEmitter; + +class BaseAgentWorker { + constructor(instance) { + this.instance = instance; + } + + get workerId() { + throw new Error('BaseAgentWorker should implement getter workerId.'); + } + + get id() { + return this.instance.id; + } + + get status() { + return this.instance.status; + } + + set id(id) { + this.instance.id = id; + } + + set status(status) { + this.instance.status = status; + } + + send() { + throw new Error('BaseAgentWorker should implement send.'); + } + + static send() { + throw new Error('BaseAgentWorker should implement send.'); + } + + static kill() { + throw new Error('BaseAgentWorker should implement kill.'); + } + + static gracefulExit() { + throw new Error('BaseAgentWorker should implement gracefulExit.'); + } +} + +class BaseAgentUtils extends EventEmitter { + constructor(options, { log, logger, messenger }) { + super(); + this.options = options; + this.log = log; + this.logger = logger; + this.messenger = messenger; + + // public attrs + this.startTime = 0; + this.instance = null; + } + + getAgentWorkerFile() { + return path.join(__dirname, '../../../agent_worker.js'); + } + + fork() { + throw new Error('BaseAgent should implement fork.'); + } + + clean() { + throw new Error('BaseAgent should implement clean.'); + } + + kill() { + throw new Error('BaseAgent should implement kill.'); + } +} + +module.exports = { BaseAgentWorker, BaseAgentUtils }; diff --git a/lib/utils/mode/base/app.js b/lib/utils/mode/base/app.js new file mode 100644 index 0000000..464f313 --- /dev/null +++ b/lib/utils/mode/base/app.js @@ -0,0 +1,101 @@ +/* istanbul ignore file */ +'use strict'; + +const path = require('path'); +const EventEmitter = require('events').EventEmitter; + +class BaseAppWorker { + constructor(instance) { + this.instance = instance; + } + + get id() { + throw new Error('BaseAppWorker should implement getter id.'); + } + + get workerId() { + throw new Error('BaseAppWorker should implement getter workerId.'); + } + + get state() { + throw new Error('BaseAppWorker should implement getter state.'); + } + + get exitedAfterDisconnect() { + throw new Error('BaseAppWorker should implement getter exitedAfterDisconnect.'); + } + + get exitCode() { + throw new Error('BaseAppWorker should implement getter exitCode.'); + } + + get disableRefork() { + return this.instance.disableRefork; + } + + get isDevReload() { + return this.instance.isDevReload; + } + + set disableRefork(status) { + this.instance.disableRefork = status; + } + + set isDevReload(status) { + this.instance.isDevReload = status; + } + + send() { + throw new Error('BaseAppWorker should implement send.'); + } + + clean() { + throw new Error('BaseAppWorker should implement clean.'); + } + + static on() { + throw new Error('BaseAppWorker should implement on.'); + } + + static send() { + throw new Error('BaseAppWorker should implement send.'); + } + + static kill() { + throw new Error('BaseAppWorker should implement kill.'); + } + + static gracefulExit() { + throw new Error('BaseAppWorker should implement gracefulExit.'); + } +} + +class BaseAppUtils extends EventEmitter { + constructor(options, { log, logger, messenger, isProduction }) { + super(); + this.options = options; + this.log = log; + this.logger = logger; + this.messenger = messenger; + this.isProduction = isProduction; + + // public attrs + this.startTime = 0; + this.startSuccessCount = 0; + this.isAllWorkerStarted = false; + } + + getAppWorkerFile() { + return path.join(__dirname, '../../../app_worker.js'); + } + + fork() { + throw new Error('BaseApp should implement fork.'); + } + + kill() { + throw new Error('BaseApp should implement kill.'); + } +} + +module.exports = { BaseAppWorker, BaseAppUtils }; diff --git a/lib/utils/mode/impl/process/agent.js b/lib/utils/mode/impl/process/agent.js new file mode 100644 index 0000000..659bda8 --- /dev/null +++ b/lib/utils/mode/impl/process/agent.js @@ -0,0 +1,117 @@ +'use strict'; + +const childprocess = require('child_process'); +const semver = require('semver'); +const sendmessage = require('sendmessage'); +const gracefulExit = require('graceful-process'); + +const { BaseAgentWorker, BaseAgentUtils } = require('../../base/agent'); +const terminate = require('../../../terminate'); + +class AgentWorker extends BaseAgentWorker { + get workerId() { + return this.instance.pid; + } + + send(...args) { + sendmessage(this.instance, ...args); + } + + static send(data) { + process.send(data); + } + + static kill() { + process.exitCode = 1; + process.kill(process.pid); + } + + static gracefulExit(options) { + gracefulExit(options); + } +} + +class AgentUtils extends BaseAgentUtils { + #worker = null; + #id = 0; + + fork() { + this.startTime = Date.now(); + + const args = [ JSON.stringify(this.options) ]; + const opt = {}; + + if (process.platform === 'win32') opt.windowsHide = true; + + // add debug execArgv + const debugPort = process.env.EGG_AGENT_DEBUG_PORT || 5800; + if (this.options.isDebug) opt.execArgv = process.execArgv.concat([ `--${semver.gte(process.version, '8.0.0') ? 'inspect' : 'debug'}-port=${debugPort}` ]); + + const worker = this.#worker = childprocess.fork(this.getAgentWorkerFile(), args, opt); + const agentWorker = this.instance = new AgentWorker(worker); + this.emit('agent_forked', agentWorker); + agentWorker.status = 'starting'; + agentWorker.id = ++this.#id; + this.log('[master] agent_worker#%s:%s start with clusterPort:%s', + agentWorker.id, agentWorker.workerId, this.options.clusterPort); + + // send debug message + if (this.options.isDebug) { + this.messenger.send({ + to: 'parent', + from: 'agent', + action: 'debug', + data: { + debugPort, + pid: agentWorker.workerId, + }, + }); + } + // forwarding agent' message to messenger + worker.on('message', msg => { + if (typeof msg === 'string') { + msg = { + action: msg, + data: msg, + }; + } + msg.from = 'agent'; + this.messenger.send(msg); + }); + worker.on('error', err => { + err.name = 'AgentWorkerError'; + err.id = worker.id; + err.pid = agentWorker.workerId; + this.logger.error(err); + }); + // agent exit message + worker.once('exit', (code, signal) => { + this.messenger.send({ + action: 'agent-exit', + data: { + code, + signal, + }, + to: 'master', + from: 'agent', + }); + }); + + return this; + } + + clean() { + this.#worker.removeAllListeners(); + } + + async kill(timeout) { + const worker = this.#worker; + if (worker) { + this.log('[master] kill agent worker with signal SIGTERM'); + this.clean(); + await terminate(worker, timeout); + } + } +} + +module.exports = { AgentWorker, AgentUtils }; diff --git a/lib/utils/mode/impl/process/app.js b/lib/utils/mode/impl/process/app.js new file mode 100644 index 0000000..3f09d0c --- /dev/null +++ b/lib/utils/mode/impl/process/app.js @@ -0,0 +1,151 @@ +'use strict'; + +const cluster = require('cluster'); +const cfork = require('cfork'); +const sendmessage = require('sendmessage'); +const gracefulExit = require('graceful-process'); + +const { BaseAppWorker, BaseAppUtils } = require('../../base/app'); +const terminate = require('../../../terminate'); + +class AppWorker extends BaseAppWorker { + get id() { + return this.instance.id; + } + + get workerId() { + return this.instance.process.pid; + } + + get state() { + return this.instance.state; + } + + get exitedAfterDisconnect() { + return this.instance.exitedAfterDisconnect; + } + + get exitCode() { + return this.instance.exitCode; + } + + send(...args) { + sendmessage(this.instance, ...args); + } + + clean() { + this.instance.removeAllListeners(); + } + + static on(event, callback) { + process.on(event, callback); + } + + static send(data) { + process.send(data); + } + + static kill() { + process.exitCode = 1; + process.kill(process.pid); + } + + static gracefulExit(options) { + gracefulExit(options); + } +} + +class AppUtils extends BaseAppUtils { + fork() { + this.startTime = Date.now(); + this.isAllWorkerStarted = false; + this.startSuccessCount = 0; + + const args = [ JSON.stringify(this.options) ]; + this.log('[master] start appWorker with args %j', args); + cfork({ + exec: this.getAppWorkerFile(), + args, + silent: false, + count: this.options.workers, + // don't refork in local env + refork: this.isProduction, + windowsHide: process.platform === 'win32', + }); + + let debugPort = process.debugPort; + cluster.on('fork', worker => { + const appWorker = new AppWorker(worker); + this.emit('worker_forked', appWorker); + appWorker.disableRefork = true; + worker.on('message', msg => { + if (typeof msg === 'string') { + msg = { + action: msg, + data: msg, + }; + } + msg.from = 'app'; + this.messenger.send(msg); + }); + this.log('[master] app_worker#%s:%s start, state: %s, current workers: %j', + appWorker.id, appWorker.workerId, appWorker.state, Object.keys(cluster.workers)); + + // send debug message, due to `brk` scence, send here instead of app_worker.js + if (this.options.isDebug) { + debugPort++; + this.messenger.send({ + to: 'parent', + from: 'app', + action: 'debug', + data: { + debugPort, + pid: appWorker.workerId, + }, + }); + } + }); + cluster.on('disconnect', worker => { + const appWorker = new AppWorker(worker); + this.logger.info('[master] app_worker#%s:%s disconnect, suicide: %s, state: %s, current workers: %j', + appWorker.id, appWorker.workerId, appWorker.exitedAfterDisconnect, appWorker.state, Object.keys(cluster.workers)); + }); + cluster.on('exit', (worker, code, signal) => { + const appWorker = new AppWorker(worker); + this.messenger.send({ + action: 'app-exit', + data: { + workerId: appWorker.workerId, + code, + signal, + }, + to: 'master', + from: 'app', + }); + }); + cluster.on('listening', (worker, address) => { + const appWorker = new AppWorker(worker); + this.messenger.send({ + action: 'app-start', + data: { + workerId: appWorker.workerId, + address, + }, + to: 'master', + from: 'app', + }); + }); + + return this; + } + + async kill(timeout) { + await Promise.all(Object.keys(cluster.workers).map(id => { + const worker = cluster.workers[id]; + worker.disableRefork = true; + return terminate(worker, timeout); + })); + } +} + +module.exports = { AppWorker, AppUtils }; diff --git a/lib/utils/mode/impl/worker_threads/agent.js b/lib/utils/mode/impl/worker_threads/agent.js new file mode 100644 index 0000000..302d402 --- /dev/null +++ b/lib/utils/mode/impl/worker_threads/agent.js @@ -0,0 +1,100 @@ +'use strict'; + +const workerThreads = require('worker_threads'); +const { BaseAgentUtils, BaseAgentWorker } = require('../../base/agent'); + +class AgentWorker extends BaseAgentWorker { + get workerId() { + return this.instance.threadId; + } + + send(...args) { + this.instance.postMessage(...args); + } + + static send(data) { + workerThreads.parentPort.postMessage(data); + } + + static kill() { + // in worker_threads, process.exit + // does not stop the whole program, just the single thread + process.exit(1); + } + + static gracefulExit(options) { + const { beforeExit } = options; + process.on('exit', async code => { + if (typeof beforeExit === 'function') { + await beforeExit(); + } + process.exit(code); + }); + } +} + +class AgentUtils extends BaseAgentUtils { + #worker = null; + #id = 0; + + fork() { + this.startTime = Date.now(); + + const argv = [ JSON.stringify(this.options) ]; + + const agentPath = this.getAgentWorkerFile(); + const worker = this.#worker = new workerThreads.Worker(agentPath, { argv }); + const agentWorker = this.instance = new AgentWorker(worker); + this.emit('agent_forked', agentWorker); + agentWorker.status = 'starting'; + agentWorker.id = ++this.#id; + this.log('[master] agent_worker#%s:%s start with worker_threads', + agentWorker.id, agentWorker.workerId); + + worker.on('message', msg => { + if (typeof msg === 'string') { + msg = { + action: msg, + data: msg, + }; + } + msg.from = 'agent'; + this.messenger.send(msg); + }); + + worker.on('error', err => { + err.name = 'AgentWorkerError'; + err.id = worker.id; + err.pid = agentWorker.workerId; + this.logger.error(err); + }); + + // agent exit message + worker.once('exit', (code, signal) => { + this.messenger.send({ + action: 'agent-exit', + data: { + code, + signal, + }, + to: 'master', + from: 'agent', + }); + }); + } + + clean() { + this.#worker.removeAllListeners(); + } + + async kill() { + const worker = this.#worker; + if (worker) { + this.log('[master] kill agent worker(worker_threads) by worker.terminate()'); + this.clean(); + worker.terminate(); + } + } +} + +module.exports = { AgentWorker, AgentUtils }; diff --git a/lib/utils/mode/impl/worker_threads/app.js b/lib/utils/mode/impl/worker_threads/app.js new file mode 100644 index 0000000..cb94e3c --- /dev/null +++ b/lib/utils/mode/impl/worker_threads/app.js @@ -0,0 +1,5 @@ +'use strict'; + +// worker_threads not support send handle + +module.exports = require('../process/app'); diff --git a/lib/utils/options.js b/lib/utils/options.js index 2b4ed44..9f18e34 100644 --- a/lib/utils/options.js +++ b/lib/utils/options.js @@ -16,6 +16,7 @@ module.exports = function(options) { workers: null, plugins: null, https: false, + startMode: 'process', }; options = extend(defaults, options); if (!options.workers) { diff --git a/lib/utils/terminate.js b/lib/utils/terminate.js index 0624779..302aa2f 100644 --- a/lib/utils/terminate.js +++ b/lib/utils/terminate.js @@ -4,19 +4,19 @@ const sleep = require('mz-modules/sleep'); const awaitEvent = require('await-event'); const pstree = require('ps-tree'); -module.exports = function* (subProcess, timeout) { +module.exports = async function(subProcess, timeout) { const pid = subProcess.process ? subProcess.process.pid : subProcess.pid; - const childPids = yield getChildPids(pid); - yield [ + const childPids = await getChildPids(pid); + await Promise.all([ killProcess(subProcess, timeout), killChildren(childPids, timeout), - ]; + ]); }; // kill process, if SIGTERM not work, try SIGKILL -function* killProcess(subProcess, timeout) { +async function killProcess(subProcess, timeout) { subProcess.kill('SIGTERM'); - yield Promise.race([ + await Promise.race([ awaitEvent(subProcess, 'exit'), sleep(timeout), ]); @@ -28,7 +28,7 @@ function* killProcess(subProcess, timeout) { } // kill all children processes, if SIGTERM not work, try SIGKILL -function* killChildren(children, timeout) { +async function killChildren(children, timeout) { if (!children.length) return; kill(children, 'SIGTERM'); @@ -38,7 +38,7 @@ function* killChildren(children, timeout) { let unterminated = []; while (Date.now() - start < timeout - checkInterval) { - yield sleep(checkInterval); + await sleep(checkInterval); unterminated = getUnterminatedProcesses(children); if (!unterminated.length) return; } diff --git a/package.json b/package.json index b6bf821..a9f6a15 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,6 @@ "await-event": "^2.1.0", "cfork": "^1.7.1", "cluster-reload": "^1.0.2", - "co": "^4.6.0", "debug": "^4.1.1", "depd": "^2.0.0", "detect-port": "^1.3.0", @@ -56,11 +55,11 @@ "coffee": "^5.2.1", "egg": "^2.20.0", "egg-bin": "^4.11.1", - "egg-ci": "^1.19.0", + "egg-ci": "^2.2.0", "egg-errors": "^2.2.0", "egg-mock": "^3.22.1", - "eslint": "^5.15.1", - "eslint-config-egg": "^7.2.0", + "eslint": "^8.26.0", + "eslint-config-egg": "^12.1.0", "mz": "^2.7.0", "pedding": "^1.1.0", "supertest": "^4.0.0", @@ -72,6 +71,6 @@ }, "ci": { "type": "github", - "version": "8, 10, 12, 14, 16" + "version": "14, 16, 18" } } diff --git a/test/agent_worker.test.js b/test/agent_worker.test.js index b3e6021..3de2972 100644 --- a/test/agent_worker.test.js +++ b/test/agent_worker.test.js @@ -114,7 +114,7 @@ describe('test/agent_worker.test.js', () => { return app // .debug() .expect('code', 1) - .expect('stderr', /CustomError: mock error \[https\:\/\/eggjs\.org\/zh-cn\/faq\/customPlugin_99\]/) + .expect('stderr', /CustomError: mock error \[ https\:\/\/eggjs\.org\/zh-cn\/faq\/customPlugin_99 \]/) .end(); }); @@ -123,7 +123,7 @@ describe('test/agent_worker.test.js', () => { return app // .debug() .expect('code', 1) - .expect('stderr', /CustomError: mock error \[https\:\/\/eggjs\.org\/zh-cn\/faq\/customPlugin_99\]/) + .expect('stderr', /CustomError: mock error \[ https\:\/\/eggjs\.org\/zh-cn\/faq\/customPlugin_99 \]/) .end(); }); diff --git a/test/app_worker.test.js b/test/app_worker.test.js index 25d0409..7f845e6 100644 --- a/test/app_worker.test.js +++ b/test/app_worker.test.js @@ -63,7 +63,7 @@ describe('test/app_worker.test.js', () => { return app // .debug() .expect('code', 1) - .expect('stderr', /CustomError: mock error \[https\:\/\/eggjs\.org\/zh-cn\/faq\/customPlugin_99\]/) + .expect('stderr', /CustomError: mock error \[ https\:\/\/eggjs\.org\/zh-cn\/faq\/customPlugin_99 \]/) .end(); }); @@ -79,7 +79,7 @@ describe('test/app_worker.test.js', () => { return app // .debug() .expect('code', 1) - .expect('stderr', /CustomError: mock error \[https\:\/\/eggjs\.org\/zh-cn\/faq\/customPlugin_99\]/) + .expect('stderr', /CustomError: mock error \[ https\:\/\/eggjs\.org\/zh-cn\/faq\/customPlugin_99 \]/) .end(); }); diff --git a/test/fixtures/apps/agent-worker-threads-error/agent.js b/test/fixtures/apps/agent-worker-threads-error/agent.js new file mode 100644 index 0000000..2631d9a --- /dev/null +++ b/test/fixtures/apps/agent-worker-threads-error/agent.js @@ -0,0 +1,6 @@ +'use strict'; + +module.exports = agent => { + const done = agent.readyCallback('prepare-agent'); + done(new Error('worker_threads mock error')); +}; diff --git a/test/fixtures/apps/agent-worker-threads-error/package.json b/test/fixtures/apps/agent-worker-threads-error/package.json new file mode 100644 index 0000000..e625f86 --- /dev/null +++ b/test/fixtures/apps/agent-worker-threads-error/package.json @@ -0,0 +1,3 @@ +{ + "name": "agent-worker-threads-error" +} \ No newline at end of file diff --git a/test/fixtures/apps/agent-worker-threads/agent.js b/test/fixtures/apps/agent-worker-threads/agent.js new file mode 100644 index 0000000..bcdca93 --- /dev/null +++ b/test/fixtures/apps/agent-worker-threads/agent.js @@ -0,0 +1,7 @@ +'use strict'; + +const workerThreads = require('worker_threads'); + +module.exports = () => { + console.log('workerId: %d', workerThreads.threadId); +}; diff --git a/test/fixtures/apps/agent-worker-threads/package.json b/test/fixtures/apps/agent-worker-threads/package.json new file mode 100644 index 0000000..dffc645 --- /dev/null +++ b/test/fixtures/apps/agent-worker-threads/package.json @@ -0,0 +1,3 @@ +{ + "name": "agent-worker-threads" +} \ No newline at end of file diff --git a/test/fixtures/apps/script-start/start-server.js b/test/fixtures/apps/script-start/start-server.js index 8ed1bc6..471e576 100644 --- a/test/fixtures/apps/script-start/start-server.js +++ b/test/fixtures/apps/script-start/start-server.js @@ -1,13 +1,12 @@ 'use strict'; -const co = require('co'); const sleep = require('mz-modules/sleep'); const utils = require('../../../utils'); -co(function* () { +(async function() { const app = utils.cluster('apps/agent-exit'); app.debug(); - yield app.end(); + await app.end(); app.proc.on('message', () => { process.send(app.proc.pid, () => { @@ -20,5 +19,5 @@ co(function* () { }); }); - yield sleep(3000); -}); + await sleep(3000); +})(); diff --git a/test/worker_threads.test.js b/test/worker_threads.test.js new file mode 100644 index 0000000..fdf9d66 --- /dev/null +++ b/test/worker_threads.test.js @@ -0,0 +1,31 @@ +'use strict'; + +const utils = require('./utils'); + +describe('worker_threads', () => { + let app; + + describe('Fork Agent', () => { + afterEach(() => app.close()); + + it('support config agent debug port', async () => { + app = utils.cluster('apps/agent-worker-threads', { startMode: 'worker_threads' }); + app.debug(); + return app + .expect('stdout', /workerId: 1/) + .end(); + }); + + it('should exit when emit error during agent worker boot', () => { + app = utils.cluster('apps/agent-worker-threads-error'); + app.debug(); + return app + .debug() + .expect('code', 1) + .expect('stderr', /worker_threads mock error/) + .expect('stderr', /\[agent_worker\] start error, exiting with code:1/) + .expect('stderr', /\[master\] exit with code:1/) + .end(); + }); + }); +});