Skip to content
This repository was archived by the owner on Oct 28, 2022. It is now read-only.

Commit 176f390

Browse files
committed
feat(ipc): add support for IPC connections
1 parent 119e0e9 commit 176f390

File tree

4 files changed

+117
-75
lines changed

4 files changed

+117
-75
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,10 @@ The following functions are promisified:
147147
- `MqttClient#unsubscribe`
148148
- `MqttClient#end`
149149

150+
### IPC Support
151+
152+
`mqttletoad` supports connecting to an MQTT broker running on a named pipe.
153+
150154
## Install
151155

152156
**Node.js v7.0.0 or greater required**.
@@ -201,6 +205,9 @@ const myfunc = async () => {
201205

202206
// disconnect
203207
await client.end();
208+
209+
// IPC support (mqtt only; not ws)
210+
const client = await toad.connect({path: '/path/to/my/named/pipe'});
204211
}
205212
```
206213

lib/index.js

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
const MQTT = require('mqtt');
44
const pify = require('pify');
5+
const net = require('net');
56
const {EventEmitter2} = require('eventemitter2');
67
const decoders = require('./decoders');
78
const encoders = require('./encoders');
@@ -177,17 +178,20 @@ exports.connect = async (url, opts = {}) => {
177178
opts = normalizeOptions(opts);
178179
args = [url, opts];
179180
} else {
180-
url = normalizeOptions(url);
181-
args = [url];
181+
opts = normalizeOptions(url);
182+
args = [opts];
182183
}
183184
return new Promise((resolve, reject) => {
184-
MQTT.connect(...args)
185+
(opts.path
186+
? MQTT.MqttClient(() => net.createConnection(opts.path))
187+
: MQTT.connect(...args)
188+
)
185189
.on('connect', function(connack) {
186190
/**
187-
* If `false`, this is a clean session
188-
* @public
189-
* @memberOf client
190-
*/
191+
* If `false`, this is a clean session
192+
* @public
193+
* @memberOf client
194+
*/
191195
this.sessionPresent = Boolean(connack.sessionPresent);
192196
})
193197
.once('error', reject)

test/harness/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ exports.createBroker = async (port, transformers = {}) => {
8585
});
8686
});
8787
broker.transformers = transformers;
88+
broker.port = port;
8889
return new Promise((resolve, reject) => {
8990
broker.listen(port, err => {
9091
if (err) {

test/mqttletoad.spec.js

Lines changed: 98 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -6,96 +6,126 @@ const {MqttClient} = require('mqtt');
66
const {connect} = require('..');
77
const {createBroker} = require('./harness');
88
const getPort = require('get-port');
9+
const os = require('os');
10+
const path = require('path');
911

1012
describe('mqttletoad', function() {
1113
let broker;
1214
let port;
1315

14-
describe('connect()', function() {
15-
describe('when given no arguments', function() {
16-
it('should reject', async function() {
17-
return expect(connect(), 'to be rejected with', /invalid/i);
18-
});
19-
});
16+
describe('method', function() {
17+
describe('connect()', function() {
18+
describe('IPC', function() {
19+
let client;
2020

21-
describe('when given a valid connection object', function() {
22-
let client;
21+
beforeEach(async function() {
22+
broker = await createBroker(
23+
path.join(os.tmpdir(), `mqttletoad-${Date.now()}`)
24+
);
25+
});
2326

24-
beforeEach(async function() {
25-
port = await getPort();
26-
broker = await createBroker(port);
27-
});
27+
afterEach(function(done) {
28+
client.end().then(() => {
29+
broker.close(done);
30+
});
31+
});
2832

29-
it('should fulfill', async function() {
30-
const promise = connect({host: 'localhost', port, protocol: 'mqtt'});
31-
client = await expect(promise, 'to be fulfilled');
32-
return client.end();
33+
it('should allow connection via a path', async function() {
34+
client = await connect({path: broker.port});
35+
});
3336
});
3437

35-
afterEach(function(done) {
36-
client.end().then(() => {
37-
broker.close(done);
38+
describe('TCP', function() {
39+
describe('when given no arguments', function() {
40+
it('should reject', async function() {
41+
return expect(connect(), 'to be rejected with', /invalid/i);
42+
});
3843
});
39-
});
40-
});
4144

42-
describe('upon first connection', function() {
43-
let promise;
45+
describe('when given a valid connection object', function() {
46+
let client;
4447

45-
beforeEach(async function() {
46-
port = await getPort();
47-
broker = await createBroker(port);
48-
promise = connect(`mqtt://localhost:${port}`);
49-
});
48+
beforeEach(async function() {
49+
port = await getPort();
50+
broker = await createBroker(port);
51+
});
52+
53+
it('should fulfill', async function() {
54+
const promise = connect({
55+
host: 'localhost',
56+
port,
57+
protocol: 'mqtt'
58+
});
59+
client = await expect(promise, 'to be fulfilled');
60+
return client.end();
61+
});
5062

51-
afterEach(function(done) {
52-
promise.then(client => client.end()).then(() => {
53-
broker.close(done);
63+
afterEach(function(done) {
64+
client.end().then(() => {
65+
broker.close(done);
66+
});
67+
});
5468
});
55-
});
5669

57-
it('should resolve with the wrapped MqttClient once connected', async function() {
58-
return expect(
59-
promise,
60-
'when fulfilled',
61-
expect.it('to be a', MqttClient)
62-
);
63-
});
70+
describe('upon first connection', function() {
71+
let promise;
6472

65-
it('should assign `sessionPresent` property', async function() {
66-
return expect(
67-
promise,
68-
'when fulfilled',
69-
expect.it('to have property', 'sessionPresent', false)
70-
);
71-
});
72-
});
73+
beforeEach(async function() {
74+
port = await getPort();
75+
broker = await createBroker(port);
76+
promise = connect(`mqtt://localhost:${port}`);
77+
});
7378

74-
describe('upon subsequent connections', function() {
75-
let client;
79+
afterEach(function(done) {
80+
promise.then(client => client.end()).then(() => {
81+
broker.close(done);
82+
});
83+
});
7684

77-
beforeEach(async function() {
78-
port = await getPort();
79-
broker = await createBroker(port);
80-
client = await connect(`mqtt://localhost:${port}`);
81-
broker.transformers.connack = _ => ({
82-
returnCode: 0,
83-
sessionPresent: true
84-
});
85-
client.stream.end();
86-
// at this point, it should automatically reconnect
87-
});
85+
it('should resolve with the wrapped MqttClient once connected', async function() {
86+
return expect(
87+
promise,
88+
'when fulfilled',
89+
expect.it('to be a', MqttClient)
90+
);
91+
});
8892

89-
afterEach(function(done) {
90-
client.end().then(() => {
91-
broker.close(done);
93+
it('should assign `sessionPresent` property', async function() {
94+
return expect(
95+
promise,
96+
'when fulfilled',
97+
expect.it('to have property', 'sessionPresent', false)
98+
);
99+
});
92100
});
93-
});
94101

95-
it('should update `sessionPresent` accordingly', function(done) {
96-
client.once('connect', () => {
97-
expect(client.sessionPresent, 'to be', true);
98-
done();
102+
describe('upon subsequent connections', function() {
103+
let client;
104+
105+
beforeEach(async function() {
106+
port = await getPort();
107+
broker = await createBroker(port);
108+
client = await connect(`mqtt://localhost:${port}`);
109+
broker.transformers.connack = _ => ({
110+
returnCode: 0,
111+
sessionPresent: true
112+
});
113+
client.stream.end();
114+
// at this point, it should automatically reconnect
115+
});
116+
117+
afterEach(function(done) {
118+
client.end().then(() => {
119+
broker.close(done);
120+
});
121+
});
122+
123+
it('should update `sessionPresent` accordingly', function(done) {
124+
client.once('connect', () => {
125+
expect(client.sessionPresent, 'to be', true);
126+
done();
127+
});
128+
});
99129
});
100130
});
101131
});

0 commit comments

Comments
 (0)