Skip to content

Streaming message queue with pub-sub, work queues, wildcards and back-pressure. Just Node and a filesystem required.

License

Notifications You must be signed in to change notification settings

davedoesdev/mqlobber

Repository files navigation

mqlobber   Build Status Build status Coverage Status NPM version

Streaming message queue with pub-sub, work queues, wildcards and back-pressure. Just Node and a filesystem required.

mqlobber basically remotes qlobber-fsq over one or more connections.

Say you have a server and a number of clients, with the clients connected to the server using some mechanism which provides a stream for each connection. Create a QlobberFSQ instance on the server and for each stream, pass the instance and the stream to MQlobberServer.

On each client, pass the other end of the stream to MQlobberClient. Clients can then publish and subscribe to topics (including wildcard subscriptions). Work queues are also supported - when publishing a message, a client can specify that only one subscriber should receive it.

All data is transferred on streams multiplexed over each connection using bpmux, with full back-pressure support on each stream. Clients get a Writable when publishing a message and a Readable when receiving one.

You can scale out horizontally by creating a number of QlobberFSQ instances (e.g. one per CPU core), all sharing the same message directory.

No other backend services are required - just Node and a filesystem.

The API is described here.

Example

First, let's create a server program which listens on a TCP port specified on the command line:

// server.js
var net = require('net'),
    QlobberFSQ = require('qlobber-fsq').QlobberFSQ,
    MQlobberServer = require('mqlobber').MQlobberServer,
    fsq = new QlobberFSQ();

fsq.on('start', function ()
{
    var server = net.createServer().listen(parseInt(process.argv[2]));
    server.on('connection', function (c)
    {
        new MQlobberServer(fsq, c);
    });
});

Next, a program which connects to the server and subscribes to messages published to a topic:

// client_subscribe.js
var assert = require('assert'),
    MQlobberClient = require('mqlobber').MQlobberClient,
    c = require('net').createConnection(parseInt(process.argv[2])),
    mq = new MQlobberClient(c),
    topic = process.argv[3];

mq.subscribe(topic, function (s, info)
{
    var msg = '';
    s.on('readable', function ()
    {
        var data;
        while ((data = this.read()) !== null)
        {
            msg += data.toString();
        }
    });
    s.on('finish', function ()
    {
        c.end();
    });
    s.on('end', function ()
    {
        console.log('received', info.topic, msg);
        assert.equal(msg, 'hello');
    });
});

Finally, a program which connects to the server and publishes a message to a topic:

// client_publish.js
var MQlobberClient = require('mqlobber').MQlobberClient,
    c = require('net').createConnection(parseInt(process.argv[2])),
    mq = new MQlobberClient(c);

mq.publish(process.argv[3], function ()
{
    c.end();
}).end('hello');

Run two servers listening on ports 8600 and 8601:

node server.js 8600 &
node server.js 8601 &

Subscribe to two topics, foo.bar and wildcard topic foo.*, one against each server:

node client_subscribe.js 8600 foo.bar &
node client_subscribe.js 8601 'foo.*' &

Then publish a message to the topic foo.bar:

node client_publish.js 8600 foo.bar

You should see the following output, one line from each subscriber:

received foo.bar hello
received foo.bar hello

Only the servers should still be running and you can now terminate them:

$ jobs
[1]-  Running                 node server.js 8600 &
[2]+  Running                 node server.js 8601 &
$ kill %1 %2
[1]-  Terminated              node server.js 8600
[2]+  Terminated              node server.js 8601

Installation

npm install mqlobber

Licence

MIT

Test

grunt test

Lint

grunt lint

Code Coverage

grunt coverage

Istanbul results are available here.

Coveralls page is here.

API

MQlobberClient(stream, [options])

Create a new MQlobberClient object for publishing and subscribing to messages via a server.

Parameters:

  • {Duplex} stream Connection to a server. The server should use MQlobberServer on its side of the connection. How the connection is made is up to the caller - it just has to supply a Duplex. For example, net.Socket or PrimusDuplex.
  • {Object} [options] Configuration options. This is passed down to QlobberDedup (which matches messages received from the server to handlers) and BPMux (which multiplexes message streams over the connection to the server). It also supports the following additional property:
    • {Buffer} [handshake_data] Application-specific handshake data to send to the server. The server-side MQlobberServer object will emit this as a handshake event to its application.

Throws:

  • {Error} If an error occurs before initiating the multiplex with the server.

Go: TOC

MQlobberClient.prototype.subscribe(topic, handler, [cb])

Subscribe to messages published to the server.

Parameters:

  • {String} topic Which messages you're interested in receiving. Message topics are split into words using . as the separator. You can use * to match exactly one word in a topic or # to match zero or more words. For example, foo.* would match foo.bar whereas foo.# would match foo, foo.bar and foo.bar.wup. Note these are the default separator and wildcard characters. They can be changed on the server when [constructing the QlobberFSQ object] (https://github.com/davedoesdev/qlobber-fsq#qlobberfsqoptions) passed to MQlobberServer.

  • {Function} handler Function to call when a new message is received from the server due to its topic matching against topic. handler will be passed the following arguments:

    • {Readable} stream The message content as a Readable. Note that all subscribers will receive the same stream for each message.

    • {Object} info Metadata for the message, with the following properties:

      • {String} topic Topic to which the message was published.
      • {Boolean} single Whether this message is being given to at most one handler (across all clients connected to all servers).
      • {Integer} expires When the message expires (number of seconds after 1 January 1970 00:00:00 UTC). This is only present if the server's MQlobberServer instance is configured with send_expires set to true.
      • {Integer} size Size of the message in bytes. This is only present if the server's MQlobberServer instance is configured with send_size set to true.
    • {Function} done Function to call once you've handled the message. Note that calling this function is only mandatory if info.single === true, in order to clean up the message on the server. done takes one argument:

      • {Object} err If an error occurred then pass details of the error, otherwise pass null or undefined.
  • {Function} [cb] Optional function to call once the subscription has been registered with the server. This will be passed the following argument:

    • {Object} err If an error occurred then details of the error, otherwise null.

Throws:

  • {Error} If an error occurs before sending the subscribe request to the server.

Go: TOC | MQlobberClient.prototype

MQlobberClient.prototype.unsubscribe([topic], [handler], [cb])

Unsubscribe to messages published to the server.

Parameters:

  • {String} [topic] Which messages you're no longer interested in receiving via the handler function. If topic is undefined then all handlers for all topics are unsubscribed.
  • {Function} [handler] The function you no longer want to be called with messages published to the topic topic. This should be a function you've previously passed to subscribe. If you subscribed handler to a different topic then it will still be called for messages which match that topic. If handler is undefined, all handlers for the topic topic are unsubscribed.
  • {Function} [cb] Optional function to call once handler has been unsubscribed from topic on the server. This will be passed the following argument:
    • {Object} err If an error occurred then details of the error, otherwise null.

Throws:

  • {Error} If an error occurs before sending the unsubscribe request to the server.

Go: TOC | MQlobberClient.prototype

MQlobberClient.prototype.publish(topic, [options], [cb])

Publish a message to the server for interested clients to receive.

Parameters:

  • {String} topic Message topic. The topic should be a series of words separated by . (or whatever you configured QlobberFSQ with on the server).

  • {Object} [options] Optional settings for this publication:

    • {Boolean} single If true then the message will be given to at most one handler (across all clients connected to all servers). If you don't specify this then all interested handlers (across all clients) will receive it.

    • {Integer} ttl Time-to-live (in seconds) for this message. If you don't specify this then the default is taken from the QlobberFSQ instance on the server. In any case, QlobberFSQ's configured time-to-live is used to constrain ttl's maximum value.

  • {Function} [cb] Optional function to call once the server has published the message. This will be passed the following argument:

    • {Object} err If an error occurred then details of the error, otherwise null.

Return:

{Writable} Stream to which to write the message's data. Make sure you end it when you're done.

Throws:

  • {Error} If an error occurs before sending the publish request to the server.

Go: TOC | MQlobberClient.prototype

MQlobberClient.events.handshake(handshake_data)

handshake event

Emitted by a MQlobberClient object after it successfully completes an initial handshake with its peer MQlobberServer object on the server.

Parameters:

  • {Buffer} handshake_data Application-specific data which the MQlobberServer object sent along with the handshake.

Go: TOC | MQlobberClient.events

MQlobberClient.events.backoff()

backoff event

Emitted by a MQlobberClient object when it delays a request to the server because the connection is at full capacity. If you want to avoid buffering further requests, don't call subscribe, unsubscribe or publish until a drain event is emitted.

Go: TOC | MQlobberClient.events

MQlobberClient.events.drain()

drain event

Emitted by a MQlobberClient object when the multiplexing layer emits a drain event.

Go: TOC | MQlobberClient.events

MQlobberClient.events.full()

full event

Emitted by a MQlobberClient object when the multiplexing layer emits a full event.

Go: TOC | MQlobberClient.events

MQlobberClient.events.removed(duplex)

removed event

Emitted by a MQlobberClient object when the multiplexing layer emits a removed event.

Parameters:

  • {Duplex} duplex The multiplexed stream which has closed.

Go: TOC | MQlobberClient.events

MQlobberClient.events.error(err, obj)

error event

Emitted by a MQlobberClient object if an error is emitted by the multiplexing layer (bpmux), preventing proper communication with the server.

Parameters:

  • {Object} err The error that occurred.
  • {Object} obj The object on which the error occurred.

Go: TOC | MQlobberClient.events

MQlobberClient.events.warning(err, obj)

warning event

Emmited by a MQlobberClient object when a recoverable error occurs. This will usually be due to an error on an individual request or multiplexed stream.

Note that if there are no warning event listeners registered then the error will be displayed using console.error.

Parameters:

  • {Object} err The error that occurred.
  • {Object} obj The object on which the error occurred.

Go: TOC | MQlobberClient.events

MQlobberServer(fsq, stream, [options])

Create a new MQlobberServer object for publishing and subscribing to messages on behalf of a client.

Parameters:

  • {QlobberFSQ | QlobberPG} fsq File system queue - an instance of QlobberFSQ. This does the heavy-lifting of reading and writing messages to a directory on the file system. Alternatively, you can pass an instance of QlobberPG, which uses PostgreSQL to process messages.
  • {Duplex} stream Connection to the client. The client should use MQlobberClient on its side of the connection. How the connection is made is up to the caller - it just has to supply a Duplex. For example, net.Socket or PrimusDuplex.
  • {Object} [options] Configuration options. This is passed down to BPMux (which multiplexes message streams over the connection to the client). It also supports the following additional properties:
    • {Boolean} send_expires Whether to include message expiry time in metadata sent to the client. Defaults to false.

    • {Boolean} send_size Whether to include message size in metadata sent to then client. Defaults to false.

    • {Boolean} defer_to_final_handler If true then a message stream is only considered finished when all MQlobberServer objects finish processing it. Defaults to false.

Go: TOC

MQlobberServer.prototype.subscribe(topic, [options], [cb])

Subscribe the connected client to messages.

Note: If the client is already subscribed to topic, this function will do nothing (other than call cb).

Parameters:

  • {String} topic Which messages the client should receive. Message topics are split into words using . as the separator. You can use * to match exactly one word in a topic or # to match zero or more words. For example, foo.* would match foo.bar whereas foo.# would match foo, foo.bar and foo.bar.wup. Note these are the default separator and wildcard characters. They can be changed when [constructing the QlobberFSQ instance] (https://github.com/davedoesdev/qlobber-fsq#qlobberfsqoptions) passed to MQlobberServer's constructor.

  • {Object} [options] Optional settings for this subscription:

    • {Boolean} subscribe_to_existing If true then the client will be sent any existing, unexpired messages that match topic, as well as new ones. Defaults to false (only new messages).
  • {Function} [cb] Optional function to call once the subscription has been made. This will be passed the following arguments:

    • {Object} err If an error occurred then details of the error, otherwise null.

    • {Integer} n The number of subscriptions made (0 if topic was already subscribed to, 1 if not).

Go: TOC | MQlobberServer.prototype

MQlobberServer.prototype.unsubscribe([topic], [cb])

Unsubscribe the connected client from messages.

Parameters:

  • {String} [topic] Which messages the client should no longer receive. If topic is undefined then the client will receive no more messages at all.
  • {Function} [cb] Optional function to call once the subscription has been removed. This will be passed the following arguments:
    • {Object} err If an error occurred then details of the error, otherwise null'.

    • {Integer} n The number of subscriptions removed.

Go: TOC | MQlobberServer.prototype

MQlobberServer.events.subscribe_requested(topic, cb)

subscribe_requested event

Emitted by a MQlobberServer object when it receives a request from its peer MQlobberClient object to subscribe to messages published to a topic.

If there are no listeners on this event, the default action is to call subscribe(topic, cb). If you add a listener on this event, the default action will not be called. This gives you the opportunity to filter subscription requests in the application.

Parameters:

  • {String} topic The topic to which the client is asking to subscribe.
  • {Function} cb Function to call after processing the subscription request. This function must be called even if you don't call subscribe yourself. It takes the following arguments:
    • {Object} err If null then a success status is returned to the client (whether you called subscribe or not). Otherwise, the client gets a failed status and a warning event is emitted with err.

    • {Integer} n The number of subscriptions made.

    • {Buffer} [data] Optional data to return to the client.

Go: TOC | MQlobberServer.events

MQlobberServer.events.unsubscribe_requested(topic, cb)

unsubscribe_requested event

Emitted by a MQlobberServer object when it receives a request from its peer MQlobberClient object to unsubscribe from messages published to a topic.

If there are no listeners on this event, the default action is to call unsubscribe(topic, cb). If you add a listener on this event, the default action will not be called. This gives you the opportunity to filter unsubscription requests in the application.

Parameters:

  • {String} topic The topic from which the client is asking to unsubscribe.
  • {Function} cb Function to call after processing the unsubscription request. This function must be called even if you don't call unsubscribe yourself. It takes the following arguments:
    • {Object} err If null then a success status is returned to the client (whether you called unsubscribe or not). Otherwise, the client gets a failed status and a warning event is emitted with err.

    • {Integer} n The number of subscriptions removed.

    • {Buffer} [data] Optional data to return to the client.

Go: TOC | MQlobberServer.events

MQlobberServer.events.unsubscribe_all_requested(cb)

unsubscribe_all_requested event

Emited by a MQlobberServer object when it receives a request from its peer MQlobberClient object to unsubscribe from all messages published to any topic.

If there are no listeners on this event, the default action is to call unsubscribe(cb). If you add a listener on this event, the default action will not be called. This gives you the opportunity to filter unsubscription requests in the application.

Parameters:

  • {Function} cb Function to call after processing the unsubscription request. This function must be called even if you don't call unsubscribe yourself. It takes the following arguments:
    • {Object} err If null then a success status is returned to the client (whether you called unsubscribe or not). Otherwise, the client gets a failed status and a warning event is emitted with err.

    • {Integer} n The number of subscriptions removed.

    • {Buffer} [data] Optional data to return to the client.

Go: TOC | MQlobberServer.events

MQlobberServer.events.publish_requested(topic, stream, options, cb)

publish_requested event

Emitted by a MQlobberServer object when it receives a request from its peer MQlobberClient object to publish a message to a topic.

If there are no listeners on this event, the default action is to call stream.pipe(fsq.publish(topic, options, cb)), where fsq is the QlobberFSQ instance you passed to MQlobberServer's constructor.

Parameters:

  • {String} topic The topic to which the message should be published.

  • {Readable} stream The message data as a Readable. This is multiplexed over the connection to the client - back-pressure is applied to the sender MQlobberClient object according to when you call read.

  • {Object} options Optional settings for this publication:

    • {Boolean} single If true then the message should be published to at most one client (across all servers). Otherwise, it should be published to all interested clients.

    • {Integer} ttl Time-to-live (in seconds) for this message.

  • {Function} cb Function to call after processing the publication request. This function must be called even if you don't call publish yourself. It takes the following arguments:

    • {Object} err If null then a success status is returned to the client (whether you called publish or not). Otherwise, the client gets a failed status and a warning event is emitted with err.

    • {Buffer} [data] Optional data to return to the client.

Go: TOC | MQlobberServer.events

MQlobberServer.events.message(stream, info, multiplex, done)

message event

Emitted by a MQlobberServer object when its QlobberFSQ object passes it a message published to a topic to which its peer MQlobberClient object has subscribed.

If there are no listeners on this event, the default action is to call stream.pipe(multiplex()).

You can add a listener on this event to insert processing between the message stream and the client.

Parameters:

  • {Readable} stream The message content as a Readable. Note that all subscribers will receive the same stream for each message.

  • {Object} info Metadata for the message, with the following properties:

    • {String} topic Topic to which the message was published.
    • {Boolean} single Whether this message is being given to at most one handler (across all clients connected to all servers).
    • {Integer} expires When the message expires (number of seconds after 1 January 1970 00:00:00 UTC). This is only present if the MQlobberServer object was configured with send_expires set to true.
    • {Integer} size Size of the message in bytes. This is only present if the server's MQlobberServer instance is configured with send_size set to true.
  • {Function} multiplex Function to call in order to multiplex a new stream over the connection to the client. It returns the multiplexed stream, to which the data from stream should be written - after the application applies whatever transforms and processing it requires.

  • {Function} done If you don't call multiplex then you should call this function to indicate you have finished handling the message. done takes the following optional argument:

    • {Object} [err] If an error occurred while handling the message, pass it here.

Go: TOC | MQlobberServer.events

MQlobberServer.events.handshake(handshake_data, delay_handshake)

handshake event

Emitted by a MQlobberServer object after it receives an initial handshake message from its peer MQlobberClient object on the client.

Parameters:

  • {Buffer} handshake_data Application-specific data which the MQlobberClient object sent along with the handshake.
  • {Function} delay_handshake By default, MQlobberServer replies to MQlobberClient's handshake message as soon as your event handler returns and doesn't attach any application-specific handshake data. If you wish to delay the handshake message or provide handshake data, call delay_handshake. It returns another functon which you can call at any time to send the handshake message. The returned function takes a single argument:
    • {Buffer} [handshake_data] Application-specific handshake data to send to the client. The client-side MQlobberClient object will emit this as a handshake event to its application.

Go: TOC | MQlobberServer.events

MQlobberServer.events.backoff()

backoff event

Emitted by a MQlobberServer object when it delays a message to the client because the connection is at full capacity.

If you want to avoid buffering further messages, use a filter function (see QlobberFSQ's constructor) to prevent messages being sent until a drain event is emitted. In the filter function, a handler owned by a MQlobberServer object will have a property named mqlobber_server set to the MQlobberServer object.

You can also use event listeners on subscribe_requested, unsubscribe_requested, unsubscribe_all_requested and publish_requested to prevent responses being sent to the client until a drain event is emitted.

Depending on your application, you might also terminate the connection if it can't keep up.

Go: TOC | MQlobberServer.events

MQlobberServer.events.drain()

drain event

Emitted by a MQlobberServer object when the multiplexing layer emits a drain event.

Go: TOC | MQlobberServer.events

MQlobberServer.events.full()

full event

Emitted by a MQlobberServer object when the multiplexing layer emits a full event.

Go: TOC | MQlobberServer.events

MQlobberServer.events.removed(duplex)

removed event

Emitted by a MQlobberServer object when the multiplexing layer emits a removed event.

Parameters:

  • {Duplex} duplex The multiplexed stream which has closed.

Go: TOC | MQlobberServer.events

MQlobberServer.events.ack(info)

ack event

Emitted by a MQlobberServer object when the client has acknowledged receipt of a message.

Parameters:

  • {Object} info Metadata for the message, with the following properties:
    • {String} topic Topic to which the message was published.
    • {Boolean} single Always true because acknowledgements are only supported for messages which were given to a single handler (across all clients connected to all servers).
    • {Integer} expires When the message expires (number of milliseconds after 1 January 1970 00:00:00 UTC).

Go: TOC | MQlobberServer.events

MQlobberServer.events.error(err, obj)

error event

Emitted by a MQlobberServer object if an error is emitted by the multiplexing layer (bpmux), preventing proper communication with the client.

Parameters:

  • {Object} err The error that occurred.
  • {Object} obj The object on which the error occurred.

Go: TOC | MQlobberServer.events

MQlobberServer.events.warning(err, obj)

warning event

Emited by a MQlobberServer object when a recoverable error occurs. This will usually be due to an error on an individual request or multiplexed stream.

Note that if there are no warning event listeners registered then the error will be displayed using console.error.

Parameters:

  • {Object} err The error that occurred.
  • {Object} obj The object on which the error occurred.

Go: TOC | MQlobberServer.events

—generated by apidox

About

Streaming message queue with pub-sub, work queues, wildcards and back-pressure. Just Node and a filesystem required.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published