Switch branches/tags
Nothing to show
Find file History
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
..
Failed to load latest commit information.
README.md
async.js
cluster.js
index.js

README.md

AsynCluster Module

This module sets up processLimit number of worker process each running asyncLimit tasks asynchronously.

This module makes use of two libraries:

  • cluster Present in node API, it is used to take advantage of multi-core systems, launching a cluster of Node.js processes to handle the load.

  • Async.js It is a utility module which provides straight-forward, powerful functions for working with asynchronous JavaScript.

Architecture

A master process forks processLimit number of worker process, each of which run asyncLimit number of instanceFunction on a provided set of arguments.

alt text

API reference

An instance of this module can be started by passing the following the parameters to the default export:

export default function asyncCluster({
	asyncLimit: number,
	processLimit: number,
	clusterArgs: Array[?],
	workerInitFunction: function,
	instanceFunction: function,
	workerExitCallback: function,
	masterExitCallback: function
}){...}

Details of the arguments are as follows:

  1. asyncLimit: Maximum number of instances of instanceFunction (see below) running asynchronously in a worker process. This is a not a compulsory argument, and if not provided defaults to a value of 4.

  2. processLimit: Maximum Number of worker processes forked. This is a not a compulsory argument and if not provided default to a value of number of CPUs in the system.

  3. clusterArgs: Arguments for the whole cluster.

    const clusterArgs = [...argsToBeProcessed]
    • It is assumed that the provided instanceFunction can process any of the provided clusterArgs.

    • The clusterArgs are split into parallelLimit chunks, called workerArgs.

    • The asyncLimit number of instanceFunction is then run against each workerArgs.

    • This is not a compulsory argument. In case not provided, parallelLimit number of default arguments are generated to run parallelLimit number of workers.

    • In case of data import producers, clusterArgs are usually the full path to each dump (something like ${path}/${no}.txt) for instanceFunction to read and process. In case of data import consumers, it is null, which would by default launch parallelLimit number of consumers.

  4. workerInitFunction: This function is called to carry out any form of preprocessing required before the worker starts running instanceFunctions.

    function workerInitFunction(id:number) {...}
    • It takes in workerId as the numerical value as it's input.
    • The return value from the workerInitFunction is appended to each clusterArgs to be sent over to instanceFunction. The new full instanceFunctionArgument is
    {
    	init: valueFromInitFunction,
    	id: workerId,
    	base: theOriginalValueInTheClusterArgs
    }
    • In case of both data import producer and consumers, the workerInitFunction provides the #connection object (to connect with the rmq). The instanceFunctions then create queue.channel to connect to the queue within each worker process.
    • Note that as creating #connection is an expensive operation, it is necessary to create it once per process, and then create #channel as and when needed in asynchronous tasks within the process. The workerInitFunction provides the initial #connection object.
    • Also, note that creating #connection object in the master process before forking the worker processes would not help, as soon as the first worker process exits, the #connection object (a fd of the sorts) would be closed because of which the other workers as well as master would be unable to utilise the #connection object.
  5. instanceFunction: The base function to be run for each argument. It takes in arguments in form of an object with base key as one of the clusterArgs, id key as workerId and init key as return value of init function ( as explained above).

    function instanceFunction({id:number, init: ?, base: ?}) {...}
    • It is assumed that the base functions return a Promise carrying the results computed after processing on the workerArgs.
    • In case of data import producers, instanceFunction take in a file path, create a file IO stream and process the file. They return the results in a Promise, which contains results fulfilled after completion of the processing. The worker process is terminated after fulfilment of the Promise.
    • In case of data import consumers, instanceFunction fires #consume function, which is theoretically supposed to stay alive indefinitely (as long as the queue lives). As a result, the instanceFunction returns a never fulfilling Promise, so that the #consume function continues to listen for new arrivals. It is only terminated using something like SIGINT signal from keyboard (Ctrl+C).
  6. workerExitCallback: This is called before a worker exits. It receives the return value from each instance of instanceFunction run against the arguments received by the worker process. As a result, it receives an array of Promises for each argument received by the worker.

    function masterExitCallback(results:Array[?]) : Promise {...}
    • The workerExitCallback is expected to process the results received and return a Promise. It is usually intended to help the worker aggregate all the results received asynchronously in a meaningful manner. The value returned promise when fulfilled is sent over to the master process.
    • Note that as master and worker process communicate via IPC, it is not possible to send over Promises. Hence the workerExitCallback is expected to return a promise, value from which when fulfilled is sent to the master process.
    • Upon the completion of message passing to the master process, the worker process is killed with SIGHUP signal.
  7. masterExitCallback: This is called before master process exits (after all the workers have exited). The masterExitCallback receives an array containing the results received from each worker process.

    function masterExitCallback(results:Array[?]){...}
    • It is primarily intended to aggregate results from worker processes.
    • As master and worker processes communicate using IPC, Promises etc. cannot be passed around. Due to this, all results passed to masterExitCallback are absolute values and can be used to carry out any task before the master process exists.