Skip to content
This repository was archived by the owner on Oct 28, 2022. It is now read-only.

Commit 91128a1

Browse files
committed
fix(api): avoid Proxy objects, always set sessionPresent upon reconnect
this should correct some problems with - improve test against `publish()` rejection - enhance harness to test `sessionPresent` - formatting
1 parent e4def1d commit 91128a1

File tree

3 files changed

+166
-112
lines changed

3 files changed

+166
-112
lines changed

lib/index.js

Lines changed: 55 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,20 @@ const {EventEmitter2} = require('eventemitter2');
66

77
const eventify = topic => topic.replace(/#/g, '**').replace(/\+/g, '*');
88

9+
const asyncMethodNames = ['publish', 'subscribe', 'unsubscribe', 'end'];
10+
911
/**
1012
* Monkeypatches a `MqttClient` instance.
1113
* Promisifies `end`, `subscribe`, `publish`, and `unsubscribe`.
1214
* Adds special behavior around `on`, `once`, `removeListener`, `emit`, etc.
1315
* @param {MqttClient} client - MqttClient (does not mutate)
14-
* @param {Object} connack - Connection acknowledgment object
15-
* @param {boolean} connack.sessionPresent - If true, not clean session
1616
* @returns {MqttClient} Patched client
1717
*/
18-
const toadpatch = (client, connack) => {
19-
const end = pify(client.end);
20-
const subscribe = pify(client.subscribe);
21-
const publish = pify(client.publish);
22-
const unsubscribe = pify(client.unsubscribe);
23-
24-
Object.defineProperty(client, 'sessionPresent', {
25-
value: Boolean(connack.sessionPresent)
26-
});
18+
const toadpatch = client => {
19+
const asyncMethods = asyncMethodNames.reduce(
20+
(acc, name) => Object.assign(acc, {[name]: pify(client[name])}),
21+
{}
22+
);
2723

2824
/**
2925
* Adapter between MQTT topics (supporting wildcards) and the client events.
@@ -40,35 +36,33 @@ const toadpatch = (client, connack) => {
4036
* @public
4137
* @function
4238
* @param {string} topic - MQTT topic
43-
* @param {Buffer|string} message - MQTT messqage
44-
* @param {Function} listener - Listener function; called with `message` and raw `packet`
39+
* @param {Function} listener - Listener function; called with `message` and
40+
* raw `packet`
4541
* @param {Object} [opts] - Any options for MQTT subscription
4642
* @param {number} [opts.qos=0] - QoS
4743
* @returns Promise<{{topic, qos}}> Object w/ topic subscribed to and QoS
4844
* granted by broker
4945
*/
50-
client.subscribe = new Proxy(subscribe, {
51-
async apply(target, client, [topic, listener, opts = {}]) {
52-
if (typeof topic !== 'string' || typeof listener !== 'function') {
53-
throw new TypeError('Invalid parameters');
54-
}
46+
client.subscribe = async function toadSubscribe(topic, listener, opts = {}) {
47+
if (typeof topic !== 'string' || typeof listener !== 'function') {
48+
throw new TypeError('Invalid parameters');
49+
}
5550

56-
const {toad} = client;
57-
const event = eventify(topic);
58-
toad.on(event, listener);
51+
const {toad} = this;
52+
const event = eventify(topic);
53+
toad.on(event, listener);
5954

60-
// TODO: find a way to not subscribe to already-subscribed topics
61-
// TODO: note that a different QoS requires a new subscription
62-
// TODO: even if the topic is identical!
63-
try {
64-
const result = await target.apply(client, [topic, opts]);
65-
return result.shift();
66-
} catch (err) {
67-
toad.removeListener(event, listener);
68-
throw err;
69-
}
55+
// TODO: find a way to not subscribe to already-subscribed topics
56+
// TODO: note that a different QoS requires a new subscription
57+
// TODO: even if the topic is identical!
58+
try {
59+
const result = await asyncMethods.subscribe.call(this, topic, opts);
60+
return result.shift();
61+
} catch (err) {
62+
toad.removeListener(event, listener);
63+
throw err;
7064
}
71-
});
65+
};
7266

7367
/**
7468
* Topic must match exactly.
@@ -80,48 +74,42 @@ const toadpatch = (client, connack) => {
8074
* @param {Function} listener - Listener function to remove
8175
* @returns {Promise<void>}
8276
*/
83-
client.unsubscribe = new Proxy(unsubscribe, {
84-
async apply(target, client, [topic, listener]) {
85-
const {toad} = client;
86-
const event = eventify(topic);
87-
toad.removeListener(event, listener);
88-
if (!toad.listenerCount(event)) {
89-
return target.apply(client, topic);
90-
}
77+
client.unsubscribe = async function toadUnsubscribe(topic, listener) {
78+
const {toad} = this;
79+
const event = eventify(topic);
80+
toad.removeListener(event, listener);
81+
if (!toad.listenerCount(event)) {
82+
return asyncMethods.unsubscribe.call(this, topic);
9183
}
92-
});
84+
};
9385

9486
/**
9587
* Disconnects client (if connected)
9688
* @function
9789
* @public
9890
* @returns {Promise<void>}
9991
*/
100-
client.end = new Proxy(end, {
101-
async apply(target, client, ...args) {
102-
if (client.connected) {
103-
return target.apply(client, ...args);
104-
}
92+
client.end = async function toadEnd(force) {
93+
if (this.connected) {
94+
await asyncMethods.end.call(this, force);
95+
this.disconnecting = false;
10596
}
106-
});
97+
};
10798

10899
/**
109100
* Publishes a message to a topic
110101
* @function
102+
* @public
111103
* @returns {Promise<void>}
112104
*/
113-
client.publish = new Proxy(publish, {
114-
async apply(target, client, ...args) {
115-
return target.apply(client, ...args);
116-
}
117-
});
105+
client.publish = asyncMethods.publish;
118106

119107
/**
120108
* On any received message, delegate to the internal EE2 instance
121109
* where the real listeners for subscriptions are stored.
122110
*/
123-
client.on('message', (topic, message, packet) => {
124-
client.toad.emit(eventify(topic), message, packet);
111+
client.on('message', function(topic, message, packet) {
112+
this.toad.emit(eventify(topic), message, packet);
125113
});
126114

127115
return client;
@@ -135,11 +123,20 @@ const toadpatch = (client, connack) => {
135123
* @returns {Promise<MqttClient>} Patched `MqttClient` instance
136124
*/
137125
exports.connect = (...args) => {
138-
const client = MQTT.connect(...args);
139126
return new Promise((resolve, reject) => {
140-
client.on('error', reject).on('connect', connack => {
141-
resolve(toadpatch(client, connack));
142-
});
127+
MQTT.connect(...args)
128+
.on('connect', function(connack) {
129+
/**
130+
* If `false`, this is a clean session
131+
* @public
132+
* @memberOf client
133+
*/
134+
this.sessionPresent = Boolean(connack.sessionPresent);
135+
})
136+
.once('error', reject)
137+
.once('connect', function() {
138+
resolve(toadpatch(this));
139+
});
143140
});
144141
};
145142

test/harness/index.js

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
'use strict';
22

3+
const _ = require('lodash');
34
const {Server} = require('net');
45
const MqttConnection = require('mqtt-connection');
56

@@ -17,31 +18,45 @@ class Broker extends Server {
1718

1819
exports.Broker = Broker;
1920

20-
exports.createBroker = () =>
21-
new Broker(client => {
21+
exports.createBroker = async (port, transformers = {}) => {
22+
transformers = _.defaults(transformers, {
23+
connack(...args) {
24+
return {returnCode: 0};
25+
},
26+
subscribe(packet) {
27+
return {
28+
messageId: packet.messageId,
29+
granted: packet.subscriptions.map(e => e.qos)
30+
};
31+
},
32+
pingreq: _.noop,
33+
unsubscribe: _.identity,
34+
pubrel: _.identity,
35+
pubrec: _.identity,
36+
publish: _.identity
37+
});
38+
const broker = new Broker(client => {
2239
client
23-
.on('connect', () => {
24-
client.connack({returnCode: 0});
40+
.on('connect', (...args) => {
41+
client.connack(transformers.connack(...args));
2542
})
26-
.on('pingreq', () => {
27-
client.pingresp();
43+
.on('pingreq', (...args) => {
44+
client.pingresp(transformers.pingreq(...args));
2845
})
29-
.on('subscribe', packet => {
30-
client.suback({
31-
messageId: packet.messageId,
32-
granted: packet.subscriptions.map(e => e.qos)
33-
});
46+
.on('subscribe', (...args) => {
47+
client.suback(transformers.subscribe(...args));
3448
})
35-
.on('unsubscribe', packet => {
36-
client.unsuback(packet);
49+
.on('unsubscribe', (...args) => {
50+
client.unsuback(transformers.unsubscribe(...args));
3751
})
38-
.on('pubrel', packet => {
39-
client.pubcomp(packet);
52+
.on('pubrel', (...args) => {
53+
client.pubcomp(transformers.pubrel(...args));
4054
})
41-
.on('pubrec', packet => {
42-
client.pubrel(packet);
55+
.on('pubrec', (...args) => {
56+
client.pubrel(transformers.pubrec(...args));
4357
})
4458
.on('publish', packet => {
59+
packet = transformers.publish(packet);
4560
process.nextTick(() => {
4661
client.publish(packet);
4762
switch (packet.qos) {
@@ -69,3 +84,13 @@ exports.createBroker = () =>
6984
client.destroy();
7085
});
7186
});
87+
broker.transformers = transformers;
88+
return new Promise((resolve, reject) => {
89+
broker.listen(port, err => {
90+
if (err) {
91+
return reject(err);
92+
}
93+
resolve(broker);
94+
});
95+
});
96+
};

test/mqttletoad.spec.js

Lines changed: 69 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -11,66 +11,98 @@ describe('mqttletoad', function() {
1111
let broker;
1212
let port;
1313

14-
before(async function() {
15-
port = await getPort();
16-
});
14+
describe('connect()', function() {
15+
describe('upon first connection', function() {
16+
let promise;
1717

18-
beforeEach(async function() {
19-
broker = createBroker();
20-
return new Promise((resolve, reject) => {
21-
broker.listen(port, err => {
22-
if (err) {
23-
return reject(err);
24-
}
25-
resolve();
18+
beforeEach(async function() {
19+
port = await getPort();
20+
broker = await createBroker(port);
21+
promise = connect(`mqtt://localhost:${port}`);
2622
});
27-
});
28-
});
2923

30-
afterEach(function(done) {
31-
broker.close(done);
32-
});
24+
afterEach(function(done) {
25+
promise.then(client => client.end()).then(() => {
26+
broker.close(done);
27+
});
28+
});
3329

34-
describe('connect()', function() {
35-
let promise;
30+
it('should resolve with the wrapped MqttClient once connected', async function() {
31+
return expect(
32+
promise,
33+
'when fulfilled',
34+
expect.it('to be a', MqttClient)
35+
);
36+
});
3637

37-
beforeEach(function() {
38-
promise = connect(`mqtt://localhost:${port}`);
39-
});
38+
it('should not allow an invalid message to be published', async function() {
39+
const client = await promise;
40+
return expect(client.publish('foo/bar', new Date()), 'to be rejected');
41+
});
4042

41-
afterEach(async function() {
42-
const client = await promise;
43-
await client.end();
43+
it('should assign `sessionPresent` property', async function() {
44+
return expect(
45+
promise,
46+
'when fulfilled',
47+
expect.it('to have property', 'sessionPresent', false)
48+
);
49+
});
4450
});
4551

46-
it('should resolve with the wrapped MqttClient once connected', async function() {
47-
return expect(
48-
promise,
49-
'when fulfilled',
50-
expect.it('to be a', MqttClient)
51-
);
52-
});
52+
describe('upon subsequent connections', function() {
53+
let client;
54+
55+
beforeEach(async function() {
56+
port = await getPort();
57+
broker = await createBroker(port);
58+
client = await connect(`mqtt://localhost:${port}`);
59+
broker.transformers.connack = _ => ({
60+
returnCode: 0,
61+
sessionPresent: true
62+
});
63+
client.stream.end();
64+
// at this point, it should automatically reconnect
65+
});
66+
67+
afterEach(function(done) {
68+
client.end().then(() => {
69+
broker.close(done);
70+
});
71+
});
5372

54-
it('should not allow an invalid message to be published', async function() {
55-
const client = await promise;
56-
return expect(client.publish('foo/bar', new Date()), 'to be rejected');
73+
it('should update `sessionPresent` accordingly', function(done) {
74+
client.once('connect', () => {
75+
expect(client.sessionPresent, 'to be', true);
76+
done();
77+
});
78+
});
5779
});
5880
});
5981

6082
describe('mqttletoad client', function() {
6183
let client;
84+
let port;
85+
let broker;
6286

6387
beforeEach(async function() {
88+
port = await getPort();
89+
broker = await createBroker(port);
6490
client = await connect(`mqtt://localhost:${port}`);
6591
});
6692

67-
afterEach(async function() {
68-
return client.end();
93+
afterEach(function(done) {
94+
client.end().then(() => {
95+
broker.close(done);
96+
});
6997
});
7098

7199
describe('publish()', function() {
72100
it('should reject if non-string or non-Buffer value', async function() {
73-
return expect(client.publish('foo', new Date()), 'to be rejected');
101+
return expect(
102+
client.publish('foo', new Date()),
103+
'to be rejected with',
104+
TypeError
105+
);
74106
});
75107

76108
describe('default QoS (0)', function() {

0 commit comments

Comments
 (0)