Skip to content

Commit

Permalink
Improved debugg logging, error handling, etc for workers.
Browse files Browse the repository at this point in the history
  • Loading branch information
ionut.stan@bigstep.com committed Jun 14, 2020
1 parent 0f0af38 commit 8930b89
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 70 deletions.
4 changes: 2 additions & 2 deletions .eslintrc.json
Expand Up @@ -91,7 +91,7 @@
"no-irregular-whitespace": 2,
"no-iterator": 2,
"no-label-var": 2,
"no-labels": 2,
"no-labels": 0,
"no-lone-blocks": 2,
"no-mixed-spaces-and-tabs": 2,
"no-multi-spaces": 2,
Expand Down Expand Up @@ -133,7 +133,7 @@
"quotes": [2, "double", {"avoidEscape": true, "allowTemplateLiterals": true}],
"radix": 0,
"semi": [2, "always"],
"semi-spacing": [2, { "before": false, "after": true }],
"semi-spacing": [0, { "before": false, "after": true }],
"space-before-blocks": [2, "always"],
"space-before-function-paren": [2, "never"],
"space-in-parens": [2, "never"],
Expand Down
2 changes: 1 addition & 1 deletion package.json
@@ -1,7 +1,7 @@
{
"name": "jsonrpc-bidirectional",
"description": "Bidirectional JSONRPC over web sockets or HTTP with extensive plugin support.",
"version": "9.8.2",
"version": "10.0.1",
"scripts": {
"build": "node --experimental-worker build.js",
"prepublish": "node --experimental-worker build.js && node --expose-gc --max-old-space-size=1024 --experimental-worker tests/main.js",
Expand Down
13 changes: 9 additions & 4 deletions src/NodeClusterBase/MasterEndpoint.js
Expand Up @@ -97,20 +97,25 @@ class MasterEndpoint extends NodeMultiCoreCPUBase.MasterEndpoint
console.log(`Worker with PID ${worker.process.pid} and persistentId ${nPersistentWorkerID} died. Exit code: ${nExitCode}. Signal: ${nKillSignal}.`);

this.arrFailureTimestamps.push(new Date().getTime());
this.arrFailureTimestamps = this.arrFailureTimestamps.filter((nMillisecondsUnixTime) => {
return nMillisecondsUnixTime >= new Date().getTime() - (60 * 2 * 1000);
this.arrFailureTimestamps = this.arrFailureTimestamps.filter((nMillisecondsUnixTimeOfFailure) => {
return nMillisecondsUnixTimeOfFailure >= new Date().getTime() - (60 * 2 * 1000);
});

if(this.arrFailureTimestamps.length / Math.max(os.cpus().length, 1) > 4)
const nMaxFailuresPerMaxWorkers = process.uptime() < 15 /*seconds*/ ? Math.min(this.maxWorkersCount * 2, 20 /*times*/) : 20 /*times*/;
if(this.arrFailureTimestamps.length / Math.max(this.maxWorkersCount, 1) > nMaxFailuresPerMaxWorkers)
{
console.error(`[Master] *Not* adding a worker because another worker has died. Doing a .gracefulExit() instead because the number of worker failures divided by .maxWorkersCount is greater than ${nMaxFailuresPerMaxWorkers} over the last 2 minutes. ${this.arrFailureTimestamps.length / Math.max(this.maxWorkersCount, 1)} > ${nMaxFailuresPerMaxWorkers}. Process uptime is ${process.uptime()} seconds.`);
await this.gracefulExit(null);
}
else
{
if(!this.bShuttingDown)
{
await sleep(500);
const nSleepMilliSeconds = Math.max(800 + 1000 * this.readyWorkersCount, 3000);
await sleep(`Sleeping ${nSleepMilliSeconds} milliseconds before replacing exited worker.`);
// cluster.fork();

console.error("[Master] Adding a worker because another worker has exited.");
this._addWorker(nPersistentWorkerID);
}
}
Expand Down
106 changes: 94 additions & 12 deletions src/NodeMultiCoreCPUBase/MasterEndpoint.js
Expand Up @@ -100,14 +100,35 @@ class MasterEndpoint extends JSONRPC.EndpointBase
/**
* The object has worker IDs as keys and object values like this: {client: JSONRPC.Client, ready: boolean}.
*
* @returns {Object<workerID:number, {client:JSONRPC.Client, ready:boolean}>}
* @returns {Object<workerID:number, {client:JSONRPC.Client, ready:boolean, exited:boolean}>}
*/
get workerClients()
{
return this.objWorkerIDToState;
}


/**
* DO NOT use this count to determine if more workers need to be created,
* because it *excludes* workers which are in the process of becoming ready.
*
* @returns {integer}
*/
get readyWorkersCount()
{
let nCount = 0;
for(const objWorkerClient of Object.values(this.objWorkerIDToState))
{
if(objWorkerClient.ready)
{
++nCount;
}
}

return nCount;
}


/**
* @param {number} nWorkersCount
*/
Expand Down Expand Up @@ -415,7 +436,10 @@ class MasterEndpoint extends JSONRPC.EndpointBase

for(const nWorkerID in this.objWorkerIDToState)
{
if(this.objWorkerIDToState[nWorkerID].ready)
if(
this.objWorkerIDToState[nWorkerID].ready
&& !this.objWorkerIDToState[nWorkerID].exited
)
{
// Do not await, need these in parallel.
/*await*/ this.objWorkerIDToState[nWorkerID].client.gracefulExit()
Expand All @@ -440,20 +464,49 @@ class MasterEndpoint extends JSONRPC.EndpointBase
{
nWorkersGracefulExitTimeoutID = setTimeout(
() => {
console.error("Timed out waiting for workers' gracefulExit() to complete.");
console.error("[Master] Timed out waiting for workers' gracefulExit() to complete.");
process.exit(1);
},
this._nGracefulExitTimeoutMilliseconds
);
}


console.log("Waiting for workers to exit gracefully.");
while(!!Object.keys(this.objWorkerIDToState).length)
console.log("[Master] Waiting for workers to exit gracefully.");
await sleep(3000);

waitForAllWorkers:
while(Object.values(this.workerClients).length)
{
await sleep(1000);
let bLogDelimited = false;
let bWorkersStillAlive = false;
for(const strWorkerID of Object.keys(this.workerClients))
{
if(!this.workerClients[strWorkerID].exited)
{
if(!bLogDelimited)
{
console.error("------------------------------------------------------------------");
bLogDelimited = true;
}

console.error(`Worker with ID ${strWorkerID} has not yet exited. Waiting...`);
bWorkersStillAlive = true;
}
}

if(bWorkersStillAlive)
{
await sleep(2000);
continue waitForAllWorkers;
}

if(!bWorkersStillAlive)
{
break waitForAllWorkers;
}
}
console.log("All workers have exited.");
console.log("[Master] All workers have exited.");


if(nWorkersGracefulExitTimeoutID !== null)
Expand All @@ -466,7 +519,7 @@ class MasterEndpoint extends JSONRPC.EndpointBase
await this._stopServices();


console.log("[" + process.pid + "] Master process exiting gracefully.");
console.log("Master process exiting gracefully.");
process.exit(0);
}

Expand All @@ -479,15 +532,16 @@ class MasterEndpoint extends JSONRPC.EndpointBase
*/
async ping(incomingRequest, strReturn)
{
console.log("Worker said: " + JSON.stringify(strReturn));
console.log("[Master] [ping] Worker said: " + JSON.stringify(strReturn));
return strReturn;
}

async sendTransferListTest(incomingRequest, arrayBufferForTest)
{
console.log("Received buffer", arrayBufferForTest);
console.log("[sendTransferListTest] Received buffer: ", arrayBufferForTest);
}


/**
* @param {JSONRPC.IncomingRequest} incomingRequest
* @param {number} nWorkerID
Expand All @@ -499,19 +553,37 @@ class MasterEndpoint extends JSONRPC.EndpointBase
*/
async rpcWorker(incomingRequest, nWorkerID, strFunctionName, arrParams, bNotification = false)
{
let nWaitForReadyTriesLeft = 10;
while(
this.workerClients[nWorkerID]
&& !this.workerClients[nWorkerID].ready
&& !this.workerClients[nWorkerID].exited
&& --nWaitForReadyTriesLeft >= 0
)
{
console.error(`[Master] Can't RPC into Cluster worker.id ${nWorkerID}, the RPC client has not signaled it is ready for cluster IPC RPC, yet. Sleeping 1 second before re-rechecking ready status. ${nWaitForReadyTriesLeft} future retries left. The RPC call to worker.${strFunctionName}() will be continue normally if the ready status becomes true.`);
await sleep(1000);
}

if(!this.workerClients[nWorkerID])
{
throw new JSONRPC.Exception(`Cluster worker.id ${nWorkerID} is not alive.`);
throw new JSONRPC.Exception(`[Master] Can't RPC worker.${strFunctionName}() into Cluster worker.id ${nWorkerID}, it never existed (or is no longer alive and the master process is exiting).`);
}

if(this.workerClients[nWorkerID].exited)
{
throw new JSONRPC.Exception(`[Master] Can't RPC worker.${strFunctionName}() into cluster worker.id ${nWorkerID}, it has already exited.`);
}

if(!this.workerClients[nWorkerID].ready)
{
throw new JSONRPC.Exception(`Cluster worker.id ${nWorkerID} RPC client has not signaled it is ready for cluster IPC RPC, yet.`);
throw new JSONRPC.Exception(`[Master] Can't RPC worker.${strFunctionName}() into Cluster worker.id ${nWorkerID}, the RPC client has not signaled it is ready for cluster IPC RPC, yet.`);
}

return await this.workerClients[nWorkerID].client.rpc(strFunctionName, arrParams, bNotification);
}


/**
* @typedef {{ message: string, stack: string=, code: number=, type: string=, errorClass: string }} ErrorObject
*
Expand All @@ -530,6 +602,16 @@ class MasterEndpoint extends JSONRPC.EndpointBase

for(const strWorkerID of Object.keys(this.workerClients))
{
if(!this.workerClients[strWorkerID].ready)
{
continue;
}

if(this.workerClients[strWorkerID].exited)
{
continue;
}

const nWorkerID = parseInt(strWorkerID, 10);
arrPromises.push(new Promise(async(fnResolve, fnReject) => {
try
Expand Down
80 changes: 61 additions & 19 deletions src/NodeMultiCoreCPUBase/WorkerEndpoint.js
Expand Up @@ -10,6 +10,8 @@ const JSONRPC = {
}
};

const sleep = require("sleep-promise");


/**
* Extend this class to export extra worker RPC APIs.
Expand All @@ -36,6 +38,7 @@ class WorkerEndpoint extends JSONRPC.EndpointBase
this.bShuttingDown = false;

this._bWorkerStarted = false;
this._promiseStart = null;

this._nPersistentWorkerID = undefined;
}
Expand Down Expand Up @@ -75,12 +78,31 @@ class WorkerEndpoint extends JSONRPC.EndpointBase
{
if(!this._masterClient)
{
throw new Error("The master client is ready only after calling await .startWorker().");
if(!this._bWorkerStarted)
{
console.error(`
Premature access to the .masterClient property as it was not initialized by WorkerEndpoint.start().
[WorkerEndpoint] process.uptime(): ${process.uptime()} seconds.
Desperatly calling .start() (possibly prematurely) inside the .masterClient getter to allow a future access to .masterClient to succeed and not throw.
Services might not have been started yet, see .start().
As a result, this worker might yet be ready for receving workers IPC RPC.
Lazy or premature init.
Normally, the application implementing this needs to to call .start() explicitly to signal it is ready to receive RPC calls over workers IPC.
`);

this.start().catch(console.error);
}

throw new Error("The .masterClient property was not initialized by .start().");
}

return this._masterClient;
}


async getIfNotPresentPersistentWorkerID()
{
if(this._nPersistentWorkerID === undefined)
Expand All @@ -93,7 +115,7 @@ class WorkerEndpoint extends JSONRPC.EndpointBase


/**
* This overridable function is called and awaited inside startWorker().
* This overridable function is called and awaited inside start().
*
* This mustn't be called through JSONRPC.
*
Expand Down Expand Up @@ -133,30 +155,50 @@ class WorkerEndpoint extends JSONRPC.EndpointBase
*/
async start()
{
if(this._bWorkerStarted)
if(this._promiseStart)
{
throw new Error("Worker is already started.");
return this._promiseStart;
}
this._bWorkerStarted = true;


this._jsonrpcServer = new JSONRPC.Server();
this._bidirectionalWorkerRouter = await this._makeBidirectionalRouter();

// By default, JSONRPC.Server rejects all requests as not authenticated and not authorized.
this._jsonrpcServer.addPlugin(new JSONRPC.Plugins.Server.AuthenticationSkip());
this._jsonrpcServer.addPlugin(new JSONRPC.Plugins.Server.AuthorizeAll());
this._promiseStart = new Promise(async(fnResolve, fnReject) => {
try
{
if(this._bWorkerStarted)
{
throw new Error("WorkerEndpoint.start() was already called.");
}
this._bWorkerStarted = true;


this._jsonrpcServer = new JSONRPC.Server();
this._bidirectionalWorkerRouter = await this._makeBidirectionalRouter();

// By default, JSONRPC.Server rejects all requests as not authenticated and not authorized.
this._jsonrpcServer.addPlugin(new JSONRPC.Plugins.Server.AuthenticationSkip());
this._jsonrpcServer.addPlugin(new JSONRPC.Plugins.Server.AuthorizeAll());

const nConnectionID = await this._bidirectionalWorkerRouter.addWorker(await this._currentWorker(), "/api-workers/IPC");
this._masterClient = this._bidirectionalWorkerRouter.connectionIDToSingletonClient(nConnectionID, this.ReverseCallsClientClass);

this._jsonrpcServer.registerEndpoint(this);

// BidirectionalWorkerRouter requires to know when JSONRPC has finished its setup to avoid very likely race conditions.
await this._masterClient.rpc("rpc.connectToEndpoint", ["/api-workers/IPC"]);

const nConnectionID = await this._bidirectionalWorkerRouter.addWorker(await this._currentWorker(), "/api-workers/IPC");
this._masterClient = this._bidirectionalWorkerRouter.connectionIDToSingletonClient(nConnectionID, this.ReverseCallsClientClass);

await this._startServices();
await this._masterClient.workerServicesReady(await this._currentWorkerID());

this._jsonrpcServer.registerEndpoint(this);

// BidirectionalWorkerRouter requires to know when JSONRPC has finished its setup to avoid very likely race conditions.
await this._masterClient.rpc("rpc.connectToEndpoint", ["/api-workers/IPC"]);
fnResolve();
}
catch(error)
{
fnReject(error);
}
});

await this._startServices();
await this._masterClient.workerServicesReady(await this._currentWorkerID());
return this._promiseStart;
}


Expand Down

0 comments on commit 8930b89

Please sign in to comment.