Skip to content
This repository was archived by the owner on Oct 28, 2022. It is now read-only.

Commit 7863d26

Browse files
committed
fix(api): don't proxy EventEmitter
BREAKING CHANGE: This significant API change proxies `subscribe` and `publish` instead of `on`, `once`, and `emit`. There is no longer an equivalent for `once`. This was done to keep compatibility with the "interface" of an `EventEmitter`, among other things. Also it will require Node.js v7 or newer due to use of features unsupported in earlier versions. Furthermore, it fixes connection issues arising from misuse of the `pify` module.
1 parent d332347 commit 7863d26

File tree

6 files changed

+317
-348
lines changed

6 files changed

+317
-348
lines changed

.travis.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ notifications:
44
node_js:
55
- '8'
66
- '7'
7-
- '6.5'
87
after_success:
98
- nyc report --reporter=text-lcov | node_modules/.bin/coveralls
109
- npm run semantic-release

README.md

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ What's better is that `EventEmitter`s are standardized. They are easy to consum
5959

6060
### Promise Support
6161

62-
[async-mqtt](https://npm.im/async-mqtt) does the same thing. I wanted it.
62+
[async-mqtt](https://npm.im/async-mqtt) does the same thing here--more or less.
6363

6464
### WONTFIX: Message Formats
6565

@@ -95,7 +95,7 @@ client.on('radscript/4ever/format/+', (buf, {topic})=> {
9595

9696
## Install
9797

98-
**Node.js v6.5 or greater required**.
98+
**Node.js v7.0.0 or greater required**.
9999

100100
```bash
101101
$ npm install mqttletoad
@@ -104,43 +104,44 @@ $ npm install mqttletoad
104104
## Usage
105105

106106
```js
107-
async function connect() {
108-
const toad = require('mqttletoad');
109-
const client = await toad.connect('wxs://test.mosquitto.org')
110-
// a "real" `MqttClient` event. these are whitelisted
107+
const toad = require('mqttletoad');
108+
109+
const myfunc = async () => {
110+
const client = await toad.connect('wss://test.mosquitto.org');
111+
111112
client.on('disconnect', () => {
112-
console.warn('disconnected; reconnecting...');
113+
console.warn('client disconnected');
113114
})
114-
.on('winken/+/nod', (buf, packet) => {
115-
// an MQTT event
116-
console.log(`topic: "${packet.topic}", message: "${String(buf)}"`);
117-
}, {qos: 1})
118-
.on('suback', ({topic}) => {
119-
// another "real" (non-MQTT) event
120-
console.log(`subscribed to ${topic}`);
121-
});
115+
.on('offline', () => {
116+
console.warn('client offline; reconnecting...');
117+
});
118+
119+
// yes, `buf` is still a Buffer.
120+
const suback = await client.subscribe('winken/+/nod', (buf, packet) => {
121+
console.log(`topic: "${packet.topic}", message: "${String(buf)}"`);
122+
}, {qos: 1});
123+
124+
console.log(`subscribed to ${suback.topic} w/ QoS ${suback.qos}`);
122125

123-
// note this breaks the `EventEmitter#emit` contract. this might change!
124-
await client.emit('phi/slamma/jamma', 'go local sports team', {qos: 2});
126+
await client.publish('winken/blinken/nod', 'foo');
125127
}
126128
```
127129

128130
## API
129131

130-
Basically it's [async-mqtt](https://npm.im/async-mqtt) except:
132+
Basically it's [async-mqtt](https://npm.im/async-mqtt) (which is [mqtt](https://npm.im/mqtt)) except:
131133

132-
- Use `on(topic, [opts], handler)` to subscribe
133-
- Use `removeListener(topic, handler)` to unsubscribe (only if there are no more active listeners on this topic)
134-
- Use `emit(topic, message, [opts])` to publish
135-
- Listen for event `suback` if you want "subscribe" confirmation
136-
- Listen for event `unsuback` if you want "unsubscribe" confirmation
137-
- Surprises!
134+
- Use `client.subscribe(topic, [opts], listener)` to *register a listener* for the topic.
135+
- `opts` are the standard options `mqtt.Client#subscribe()` supports
136+
- While `mqtt.Client#subscribe()` supports an `Array` of topics, our `topic` is singular, and must be a string.
137+
- Standard MQTT topic wildcards are supported, and listeners are executed first in order of specificity; i.e. `foo/bar` will take precedence over `foo/+` and `foo/+` will take precedence over `foo/#`.
138+
- Use `client.unsubscribe(topic, listener)` to remove the listener for the topic.
139+
- This will not necessarily *unsubscribe* from the topic (at the broker level), because there may be other listeners, but it *will* remove the listener.
140+
- `client.end()` won't throw a fit if already disconnected
138141

139142
## Roadmap
140143

141-
- [ ] Use ES modules so it can work in the browser (the whole point of this was to use it in the browser...)
142-
- [ ] Reconsider `emit()` `Proxy`
143-
- [x] Probably use [prettier](https://npm.im/prettier)
144+
- [ ] Something something Rollup?
144145

145146
## Maintainers
146147

lib/index.js

Lines changed: 74 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -5,188 +5,115 @@ const pify = require('pify');
55
const {EventEmitter2} = require('eventemitter2');
66

77
const eventify = topic => topic.replace(/#/g, '**').replace(/\+/g, '*');
8-
const mqttify = event => event.replace(/\*\*/g, '#').replace(/\*/g, '+');
9-
10-
/**
11-
* If the topic/event is not internal, then trigger a MQTT subscription.
12-
* @param {Set} clientEvents - Set of events to ignore
13-
* @param {string} methodName - Method name to monkeypatch (`on` or `once`)
14-
*/
15-
const eventProxy = (clientEvents, methodName) => (
16-
target,
17-
client,
18-
[topic, ...args]
19-
) => {
20-
// let original events pass thru
21-
if (clientEvents.has(topic)) {
22-
return target.apply(client, [topic, ...args]);
23-
}
24-
let [opts, listener] = args;
25-
if (typeof opts === 'function') {
26-
listener = opts;
27-
opts = {};
28-
}
29-
if (!listener) {
30-
throw new TypeError('Invalid parameters');
31-
}
32-
const ee2Topic = eventify(topic);
33-
client._topicEmitter[methodName](ee2Topic, listener);
34-
35-
// TODO: find a way to not subscribe to already-subscribed topics
36-
// (note that options such as QoS may differ, and in that case, we need to
37-
// re-subscribe)
38-
client
39-
.subscribe(topic, opts)
40-
.catch(err => {
41-
client._topicEmitter.removeListener(ee2Topic, listener);
42-
client.emit('error', err);
43-
})
44-
.then(suback => {
45-
client.emit('suback', suback);
46-
});
47-
48-
return client;
49-
};
50-
51-
const unsub = (client, topic) => {
52-
return client
53-
.unsubscribe(topic)
54-
.then(() => {
55-
client.emit('unsuback', {topic});
56-
})
57-
.catch(err => {
58-
client.emit('error', err);
59-
});
60-
};
61-
62-
/*
63-
*/
648

659
/**
6610
* Monkeypatches a `MqttClient` instance.
6711
* Promisifies `end`, `subscribe`, `publish`, and `unsubscribe`.
6812
* Adds special behavior around `on`, `once`, `removeListener`, `emit`, etc.
6913
* @param {MqttClient} client - MqttClient (does not mutate)
14+
* @param {Object} connack - Connection acknowledgment object
15+
* @param {boolean} connack.sessionPresent - If true, not clean session
7016
* @returns {MqttClient} Patched client
7117
*/
72-
const toadpatch = client => {
73-
client = pify(client, {
74-
include: ['end', 'subscribe', 'publish', 'unsubscribe']
18+
const toadpatch = (client, connack) => {
19+
const end = pify(client.end);
20+
const subscribe = pify(client.subscribe);
21+
const publish = pify(client.publish);
22+
const unsubscribe = pify(client.unsubscribe);
23+
24+
Object.defineProperty(client, 'sessionPresent', {
25+
value: Boolean(connack.sessionPresent)
7526
});
7627

77-
/**
78-
* Events that MqttClient actually uses internally
79-
* @type {Set}
80-
*/
81-
const clientEvents = new Set(
82-
client
83-
.eventNames()
84-
.concat([
85-
'suback',
86-
'unsuback',
87-
'packetsend',
88-
'packetreceive',
89-
'message',
90-
'offline',
91-
'close',
92-
'reconnect'
93-
])
94-
);
95-
9628
/**
9729
* Adapter between MQTT topics (supporting wildcards) and the client events.
9830
* @type {EventEmitter2}
9931
* @private
10032
*/
101-
client._topicEmitter = new EventEmitter2({
33+
client.toad = new EventEmitter2({
10234
wildcard: true,
10335
delimiter: '/'
104-
}).on('removeListener', event => {
105-
// if we're removing a listener for any reason, check to see if an
106-
// "unsubscribe" needs to happen (and do it)
107-
if (!client._topicEmitter.listenerCount(event)) {
108-
unsub(client, mqttify(event));
109-
}
11036
});
11137

11238
/**
113-
* This will subscribe to MQTT topics if the topic/event is *not* an internal
114-
* `MqttClient` or `mqttletoad` event. `EventEmitter#on`
39+
* Subscribe to a topic with a specific listener.
11540
* @public
116-
* @param {string} topic - MQTT topic (or event)
117-
* @param {Buffer|string|ArrayBuffer|*} [message] - Event data or MQTT
118-
* message. If the latter, must be one of `Buffer`, `string`, or
119-
* `ArrayBuffer`.
120-
* @param {Object} [opts] - Any options for MQTT subscription (like `qos`),
121-
* or more data for event
122-
* @param {*} [...args] - More data for event
123-
* @returns {MqttClient}
41+
* @function
42+
* @param {string} topic - MQTT topic
43+
* @param {Buffer|string} message - MQTT messqage
44+
* @param {Function} listener - Listener function; called with `topic`,
45+
* `message`, and raw `packet`
46+
* @param {Object} [opts] - Any options for MQTT subscription
47+
* @param {number} [opts.qos=0] - QoS
48+
* @returns Promise<{{topic, qos}}> Object w/ topic subscribed to and QoS
49+
* granted by broker
12450
*/
125-
client.on = new Proxy(client.on, {
126-
apply: eventProxy(clientEvents, 'on')
51+
client.subscribe = new Proxy(subscribe, {
52+
async apply(target, client, [topic, listener, opts = {}]) {
53+
if (typeof topic !== 'string' || typeof listener !== 'function') {
54+
throw new TypeError('Invalid parameters');
55+
}
56+
57+
const {toad} = client;
58+
const event = eventify(topic);
59+
toad.on(event, listener);
60+
61+
// TODO: find a way to not subscribe to already-subscribed topics
62+
// TODO: note that a different QoS requires a new subscription
63+
// TODO: even if the topic is identical!
64+
try {
65+
const result = await target.apply(client, [topic, opts]);
66+
return result.shift();
67+
} catch (err) {
68+
toad.removeListener(event, listener);
69+
throw err;
70+
}
71+
}
12772
});
12873

12974
/**
130-
* This will subscribe to MQTT topics if the topic/event is *not* an internal
131-
* `MqttClient` or `mqttletoad` event. Works like `EventEmitter#once`
75+
* Topic must match exactly.
76+
* Only unsubscribes at broker level if no more listeners are registered for
77+
* the topic.
13278
* @public
133-
* @param {string} topic - MQTT topic (or event)
134-
* @param {Buffer|string|ArrayBuffer|*} [message] - Event data or MQTT
135-
* message. If the latter, must be one of `Buffer`, `string`, or
136-
* `ArrayBuffer`.
137-
* @param {Object} [opts] - Any options for MQTT subscription (like `qos`),
138-
* or more data for event
139-
* @param {*} [...args] - More data for event
140-
* @returns {MqttClient}
79+
* @function
80+
* @param {string} topic - MQTT topic
81+
* @param {Function} listener - Listener function to remove
82+
* @returns {Promise<void>}
14183
*/
142-
client.once = new Proxy(client.once, {
143-
apply: eventProxy(clientEvents, 'once')
84+
client.unsubscribe = new Proxy(unsubscribe, {
85+
async apply(target, client, [topic, listener]) {
86+
const {toad} = client;
87+
const event = eventify(topic);
88+
toad.removeListener(event, listener);
89+
if (!toad.listenerCount(event)) {
90+
return target.apply(client, topic);
91+
}
92+
}
14493
});
14594

14695
/**
147-
* If the topic/event is *not* internal, remove it from the internal EE2
148-
* instance, which may cause an unsubscribe to happen. Works like
149-
* `EventEmitter#removeListener`.
96+
* Disconnects client (if connected)
97+
* @function
15098
* @public
151-
* @param {string} topic - MQTT topic (or event)
152-
* @param {Function} listener - Listener function to remove
153-
* @returns {MqttClient}
99+
* @returns {Promise<void>}
154100
*/
155-
client.removeListener = new Proxy(client.removeListener, {
156-
apply(target, client, [topic, listener]) {
157-
// let original events pass thru
158-
if (clientEvents.has(topic)) {
159-
return target.apply(client, [topic, listener]);
101+
client.end = new Proxy(end, {
102+
async apply(target, client, ...args) {
103+
if (client.connected) {
104+
return target.apply(client, ...args);
160105
}
161-
// remove it from internal EE2; if none remain for the topic, it
162-
// will be unsubscribed from.
163-
client._topicEmitter.removeListener(eventify(topic), listener);
164-
return client;
165106
}
166107
});
167108

168109
/**
169-
* Publishes a MQTT message if *not* an internal event.
170-
* This breaks the contract where `emit` should return the number of
171-
* listeners. We don't know the number of listeners, and the publishing
172-
* process is async anyway (though, arguably, not with QoS 0, but...zalgo).
173-
* @public
174-
* @param {string} topic - MQTT topic (or event)
175-
* @param {Buffer|string|ArrayBuffer} [message] - Message if MQTT; otherwise
176-
* optional if internal
177-
* @param {Object} [opts] - MQTT options or more data for internal event
178-
* @param {*} [...args] - More data for internal event (ignored by MQTT)
179-
* @returns {Promise<string>|number} Topic, once publish happens (depends on
180-
* QoS), or just the number of listeners if internal (yes, this is bad)
110+
* Publishes a message to a topic
111+
* @function
112+
* @returns {Promise<void>}
181113
*/
182-
client.emit = new Proxy(client.emit, {
183-
apply(target, client, [topic, ...args]) {
184-
// let original events pass thru
185-
if (clientEvents.has(topic)) {
186-
return target.apply(client, [topic, ...args]);
187-
}
188-
const [message, opts] = args;
189-
return client.publish(topic, message, opts).then(() => topic);
114+
client.publish = new Proxy(publish, {
115+
async apply(target, client, ...args) {
116+
return target.apply(client, ...args);
190117
}
191118
});
192119

@@ -195,7 +122,7 @@ const toadpatch = client => {
195122
* where the real listeners for subscriptions are stored.
196123
*/
197124
client.on('message', (topic, message, packet) => {
198-
client._topicEmitter.emit(eventify(topic), message, packet);
125+
client.toad.emit(eventify(topic), message, packet);
199126
});
200127

201128
return client;
@@ -211,8 +138,8 @@ const toadpatch = client => {
211138
exports.connect = (...args) => {
212139
const client = MQTT.connect(...args);
213140
return new Promise((resolve, reject) => {
214-
client.on('error', reject).on('connect', () => {
215-
resolve(toadpatch(client));
141+
client.on('error', reject).on('connect', connack => {
142+
resolve(toadpatch(client, connack));
216143
});
217144
});
218145
};

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
"unexpected": "^10.36.0"
5959
},
6060
"engines": {
61-
"node": ">=6.5"
61+
"node": ">=7.0"
6262
},
6363
"bugs": {
6464
"url": "https://github.com/boneskull/mqttltoad/issues"

0 commit comments

Comments
 (0)