From f4721a7e0044246e16bc5282d0483347ad8133b7 Mon Sep 17 00:00:00 2001 From: hyj1991 Date: Sun, 30 Oct 2022 18:47:58 +0800 Subject: [PATCH] feat: support fork app worker by worker_threads --- lib/utils/mode/impl/worker_threads/agent.js | 2 +- lib/utils/mode/impl/worker_threads/app.js | 68 ++++++++++++++++++++- 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/lib/utils/mode/impl/worker_threads/agent.js b/lib/utils/mode/impl/worker_threads/agent.js index 302d402..74e6f11 100644 --- a/lib/utils/mode/impl/worker_threads/agent.js +++ b/lib/utils/mode/impl/worker_threads/agent.js @@ -90,7 +90,7 @@ class AgentUtils extends BaseAgentUtils { async kill() { const worker = this.#worker; if (worker) { - this.log('[master] kill agent worker(worker_threads) by worker.terminate()'); + this.log(`[master] kill agent worker#${this.#id} (worker_threads) by worker.terminate()`); this.clean(); worker.terminate(); } diff --git a/lib/utils/mode/impl/worker_threads/app.js b/lib/utils/mode/impl/worker_threads/app.js index cb94e3c..329588d 100644 --- a/lib/utils/mode/impl/worker_threads/app.js +++ b/lib/utils/mode/impl/worker_threads/app.js @@ -1,5 +1,69 @@ 'use strict'; -// worker_threads not support send handle +const workerThreads = require('worker_threads'); +const { BaseAppWorker, BaseAppUtils } = require('../../base/app'); -module.exports = require('../process/app'); +class AppWorker extends BaseAppWorker { + #id = 0; + + get id() { + return this.#id; + } + + get workerId() { + return this.instance.threadId; + } + + get exitCode() { + return this.instance.exitCode; + } + + send(...args) { + this.instance.postMessage(...args); + } + + clean() { + this.instance.removeAllListeners(); + } + + static on(event, callback) { + workerThreads.parentPort.on(event, callback); + } + + static send(data) { + workerThreads.parentPort.postMessage(data); + } + + static kill() { + process.exit(1); + } + + static gracefulExit(options) { + const { beforeExit } = options; + process.on('exit', async code => { + if (typeof beforeExit === 'function') { + await beforeExit(); + } + process.exit(code); + }); + } +} + +class AppUtils extends BaseAppUtils { + #workers = []; + + fork() { + + return this; + } + + async kill() { + for (const worker of this.#workers) { + this.log(`[master] kill app worker#${worker.id} (worker_threads) by worker.terminate()`); + worker.removeAllListeners(); + worker.terminate(); + } + } +} + +module.exports = { AppWorker, AppUtils };