Skip to content

Commit

Permalink
feat: support fork app worker by worker_threads
Browse files Browse the repository at this point in the history
  • Loading branch information
hyj1991 committed Oct 30, 2022
1 parent fab0c80 commit 07a79a7
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 7 deletions.
12 changes: 10 additions & 2 deletions lib/app_worker.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict';

const freePort = require('detect-port');

// $ node app_worker.js options
const options = JSON.parse(process.argv[2]);
if (options.require) {
Expand Down Expand Up @@ -61,7 +63,7 @@ function startTimeoutHandler() {
exitProcess();
}

function startServer(err) {
async function startServer(err) {
if (err) {
consoleLogger.error(err);
consoleLogger.error('[app_worker] start error, exiting with code:1');
Expand Down Expand Up @@ -113,12 +115,18 @@ function startServer(err) {
exitProcess();
return;
}
const args = [ port ];
const args = [ await freePort(port) ];
if (listenConfig.hostname) args.push(listenConfig.hostname);
debug('listen options %s', args);
server.listen(...args);
}
}

AppWorker.send({
to: 'master',
action: 'listening',
data: server.address(),
});
}

AppWorker.gracefulExit({
Expand Down
2 changes: 1 addition & 1 deletion lib/utils/mode/impl/process/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class AppUtils extends BaseAppUtils {
this.startSuccessCount = 0;

const args = [ JSON.stringify(this.options) ];
this.log('[master] start appWorker with args %j', args);
this.log('[master] start appWorker with args %j (process)', args);
cfork({
exec: this.getAppWorkerFile(),
args,
Expand Down
6 changes: 4 additions & 2 deletions lib/utils/mode/impl/worker_threads/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ class AgentUtils extends BaseAgentUtils {
fork() {
this.startTime = Date.now();

// start agent worker
const argv = [ JSON.stringify(this.options) ];

const agentPath = this.getAgentWorkerFile();
const worker = this.#worker = new workerThreads.Worker(agentPath, { argv });

// wrap agent worker
const agentWorker = this.instance = new AgentWorker(worker);
this.emit('agent_forked', agentWorker);
agentWorker.status = 'starting';
Expand Down Expand Up @@ -90,7 +92,7 @@ class AgentUtils extends BaseAgentUtils {
async kill() {
const worker = this.#worker;
if (worker) {
this.log('[master] kill agent worker(worker_threads) by worker.terminate()');
this.log(`[master] kill agent worker#${this.#id} (worker_threads) by worker.terminate()`);
this.clean();
worker.terminate();
}
Expand Down
156 changes: 154 additions & 2 deletions lib/utils/mode/impl/worker_threads/app.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,157 @@
'use strict';

// worker_threads not support send handle
const workerThreads = require('worker_threads');
const { BaseAppWorker, BaseAppUtils } = require('../../base/app');

module.exports = require('../process/app');
class AppWorker extends BaseAppWorker {
#id = 0;

constructor(instance, id) {
super(instance);
this.#id = id;
}

get id() {
return this.#id;
}

get workerId() {
return this.instance.threadId;
}

get state() {
return 'worker_threads working';
}

get exitCode() {
return this.instance.exitCode;
}

send(...args) {
this.instance.postMessage(...args);
}

clean() {
this.instance.removeAllListeners();
}

static on(event, callback) {
workerThreads.parentPort.on(event, callback);
}

static send(data) {
workerThreads.parentPort.postMessage(data);
}

static kill() {
process.exit(1);
}

static gracefulExit(options) {
const { beforeExit } = options;
process.on('exit', async code => {
if (typeof beforeExit === 'function') {
await beforeExit();
}
process.exit(code);
});
}
}

class AppUtils extends BaseAppUtils {
#workers = [];

#forkSingle(appPath, options, id) {
// start app worker
const worker = new workerThreads.Worker(appPath, options);
this.#workers.push(worker);

// wrap app worker
const appWorker = new AppWorker(worker, id);
this.emit('worker_forked', appWorker);
appWorker.disableRefork = true;
worker.on('message', msg => {
if (typeof msg === 'string') {
msg = {
action: msg,
data: msg,
};
}
msg.from = 'app';
this.messenger.send(msg);
});
this.log('[master] app_worker#%s (tid:%s) start', appWorker.id, appWorker.workerId);

// send debug message, due to `brk` scence, send here instead of app_worker.js
let debugPort = process.debugPort;
if (this.options.isDebug) {
debugPort++;
this.messenger.send({
to: 'parent',
from: 'app',
action: 'debug',
data: {
debugPort,
pid: appWorker.workerId,
},
});
}

// handle worker listening
worker.on('message', ({ action, data: address }) => {
if (action !== 'listening') {
return;
}

this.log(`[master] worker_threads listening at ${address.address}:${address.port} (%sms)`,
Date.now() - this.startTime);
this.messenger.send({
action: 'app-start',
data: {
workerId: appWorker.workerId,
address,
},
to: 'master',
from: 'app',
});

});

// handle worker exit
worker.on('exit', code => {
this.messenger.send({
action: 'app-exit',
data: {
workerId: appWorker.workerId,
code,
},
to: 'master',
from: 'app',
});
});
}

fork() {
this.startTime = Date.now();
this.isAllWorkerStarted = false;
this.startSuccessCount = 0;

const argv = [ JSON.stringify(this.options) ];

for (let i = 0; i < this.options.workers;) {
this.#forkSingle(this.getAppWorkerFile(), { argv }, ++i);
}

return this;
}

async kill() {
for (const worker of this.#workers) {
this.log(`[master] kill app worker#${worker.id} (worker_threads) by worker.terminate()`);
worker.removeAllListeners();
worker.terminate();
}
}
}

module.exports = { AppWorker, AppUtils };

0 comments on commit 07a79a7

Please sign in to comment.