diff --git a/README.md b/README.md index 6572a7c..1ea4db0 100644 --- a/README.md +++ b/README.md @@ -147,6 +147,10 @@ The following functions are promisified: - `MqttClient#unsubscribe` - `MqttClient#end` +### IPC Support + +`mqttletoad` supports connecting to an MQTT broker running on a named pipe. + ## Install **Node.js v7.0.0 or greater required**. @@ -201,6 +205,9 @@ const myfunc = async () => { // disconnect await client.end(); + + // IPC support (mqtt only; not ws) + const client = await toad.connect({path: '/path/to/my/named/pipe'}); } ``` diff --git a/lib/index.js b/lib/index.js index 538bf4d..2c827c2 100644 --- a/lib/index.js +++ b/lib/index.js @@ -2,6 +2,7 @@ const MQTT = require('mqtt'); const pify = require('pify'); +const net = require('net'); const {EventEmitter2} = require('eventemitter2'); const decoders = require('./decoders'); const encoders = require('./encoders'); @@ -177,17 +178,20 @@ exports.connect = async (url, opts = {}) => { opts = normalizeOptions(opts); args = [url, opts]; } else { - url = normalizeOptions(url); - args = [url]; + opts = normalizeOptions(url); + args = [opts]; } return new Promise((resolve, reject) => { - MQTT.connect(...args) + (opts.path + ? MQTT.MqttClient(() => net.createConnection(opts.path)) + : MQTT.connect(...args) + ) .on('connect', function(connack) { /** - * If `false`, this is a clean session - * @public - * @memberOf client - */ + * If `false`, this is a clean session + * @public + * @memberOf client + */ this.sessionPresent = Boolean(connack.sessionPresent); }) .once('error', reject) diff --git a/test/harness/index.js b/test/harness/index.js index a219a79..25b53f5 100644 --- a/test/harness/index.js +++ b/test/harness/index.js @@ -85,6 +85,7 @@ exports.createBroker = async (port, transformers = {}) => { }); }); broker.transformers = transformers; + broker.port = port; return new Promise((resolve, reject) => { broker.listen(port, err => { if (err) { diff --git a/test/mqttletoad.spec.js b/test/mqttletoad.spec.js index 0d0c95f..b27da27 100644 --- a/test/mqttletoad.spec.js +++ b/test/mqttletoad.spec.js @@ -6,96 +6,126 @@ const {MqttClient} = require('mqtt'); const {connect} = require('..'); const {createBroker} = require('./harness'); const getPort = require('get-port'); +const os = require('os'); +const path = require('path'); describe('mqttletoad', function() { let broker; let port; - describe('connect()', function() { - describe('when given no arguments', function() { - it('should reject', async function() { - return expect(connect(), 'to be rejected with', /invalid/i); - }); - }); + describe('method', function() { + describe('connect()', function() { + describe('IPC', function() { + let client; - describe('when given a valid connection object', function() { - let client; + beforeEach(async function() { + broker = await createBroker( + path.join(os.tmpdir(), `mqttletoad-${Date.now()}`) + ); + }); - beforeEach(async function() { - port = await getPort(); - broker = await createBroker(port); - }); + afterEach(function(done) { + client.end().then(() => { + broker.close(done); + }); + }); - it('should fulfill', async function() { - const promise = connect({host: 'localhost', port, protocol: 'mqtt'}); - client = await expect(promise, 'to be fulfilled'); - return client.end(); + it('should allow connection via a path', async function() { + client = await connect({path: broker.port}); + }); }); - afterEach(function(done) { - client.end().then(() => { - broker.close(done); + describe('TCP', function() { + describe('when given no arguments', function() { + it('should reject', async function() { + return expect(connect(), 'to be rejected with', /invalid/i); + }); }); - }); - }); - describe('upon first connection', function() { - let promise; + describe('when given a valid connection object', function() { + let client; - beforeEach(async function() { - port = await getPort(); - broker = await createBroker(port); - promise = connect(`mqtt://localhost:${port}`); - }); + beforeEach(async function() { + port = await getPort(); + broker = await createBroker(port); + }); + + it('should fulfill', async function() { + const promise = connect({ + host: 'localhost', + port, + protocol: 'mqtt' + }); + client = await expect(promise, 'to be fulfilled'); + return client.end(); + }); - afterEach(function(done) { - promise.then(client => client.end()).then(() => { - broker.close(done); + afterEach(function(done) { + client.end().then(() => { + broker.close(done); + }); + }); }); - }); - it('should resolve with the wrapped MqttClient once connected', async function() { - return expect( - promise, - 'when fulfilled', - expect.it('to be a', MqttClient) - ); - }); + describe('upon first connection', function() { + let promise; - it('should assign `sessionPresent` property', async function() { - return expect( - promise, - 'when fulfilled', - expect.it('to have property', 'sessionPresent', false) - ); - }); - }); + beforeEach(async function() { + port = await getPort(); + broker = await createBroker(port); + promise = connect(`mqtt://localhost:${port}`); + }); - describe('upon subsequent connections', function() { - let client; + afterEach(function(done) { + promise.then(client => client.end()).then(() => { + broker.close(done); + }); + }); - beforeEach(async function() { - port = await getPort(); - broker = await createBroker(port); - client = await connect(`mqtt://localhost:${port}`); - broker.transformers.connack = _ => ({ - returnCode: 0, - sessionPresent: true - }); - client.stream.end(); - // at this point, it should automatically reconnect - }); + it('should resolve with the wrapped MqttClient once connected', async function() { + return expect( + promise, + 'when fulfilled', + expect.it('to be a', MqttClient) + ); + }); - afterEach(function(done) { - client.end().then(() => { - broker.close(done); + it('should assign `sessionPresent` property', async function() { + return expect( + promise, + 'when fulfilled', + expect.it('to have property', 'sessionPresent', false) + ); + }); }); - }); - it('should update `sessionPresent` accordingly', function(done) { - client.once('connect', () => { - expect(client.sessionPresent, 'to be', true); - done(); + describe('upon subsequent connections', function() { + let client; + + beforeEach(async function() { + port = await getPort(); + broker = await createBroker(port); + client = await connect(`mqtt://localhost:${port}`); + broker.transformers.connack = _ => ({ + returnCode: 0, + sessionPresent: true + }); + client.stream.end(); + // at this point, it should automatically reconnect + }); + + afterEach(function(done) { + client.end().then(() => { + broker.close(done); + }); + }); + + it('should update `sessionPresent` accordingly', function(done) { + client.once('connect', () => { + expect(client.sessionPresent, 'to be', true); + done(); + }); + }); }); }); });