Skip to content
This repository was archived by the owner on Oct 28, 2022. It is now read-only.

Commit 1ee3fac

Browse files
committed
feat(mitm): support "mitm" behavior
Supplying `{mitm: true}` to `connect()` in lieu of URL (or path) will assume [mitm](https://npm.im/mitm) is in use. A dummy port is handed to `net.createConnection()`. - swaps `pify` for `promwrap` - improved test harnesses - added pending tests where more coverage is needed
1 parent 407cdf2 commit 1ee3fac

File tree

5 files changed

+162
-92
lines changed

5 files changed

+162
-92
lines changed

lib/index.js

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict';
22

33
const MQTT = require('mqtt');
4-
const pify = require('pify');
4+
const promisify = require('promwrap');
55
const net = require('net');
66
const {EventEmitter2} = require('eventemitter2');
77
const decoders = require('./decoders');
@@ -26,7 +26,7 @@ const asyncMethodNames = ['publish', 'subscribe', 'unsubscribe', 'end'];
2626
*/
2727
const toadpatch = (client, baseOpts = {}) => {
2828
const asyncMethods = asyncMethodNames.reduce(
29-
(acc, name) => Object.assign(acc, {[name]: pify(client[name])}),
29+
(acc, name) => Object.assign(acc, {[name]: promisify(client[name])}),
3030
{}
3131
);
3232

@@ -147,7 +147,7 @@ const toadpatch = (client, baseOpts = {}) => {
147147

148148
const normalizeOptions = (opts = {}, defaults = DEFAULT_OPTS) => {
149149
[['decoder', decoders], ['encoder', encoders]].forEach(([prop, builtins]) => {
150-
if (opts.hasOwnProperty(prop)) {
150+
if (prop in opts) {
151151
if (typeof opts[prop] === 'string') {
152152
const value = builtins[opts[prop]];
153153
if (!value) {
@@ -181,9 +181,10 @@ exports.connect = async (url, opts = {}) => {
181181
opts = normalizeOptions(url);
182182
args = [opts];
183183
}
184+
const path = opts.mitm ? 1833 : opts.path;
184185
return new Promise((resolve, reject) => {
185-
(opts.path
186-
? MQTT.MqttClient(() => net.createConnection(opts.path), {
186+
(path
187+
? MQTT.MqttClient(() => net.createConnection(path), {
187188
resubscribe: false
188189
})
189190
: MQTT.connect(...args)

package-lock.json

Lines changed: 36 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
"dependencies": {
3232
"eventemitter2": "^4.1.2",
3333
"mqtt": "^2.15.1",
34-
"pify": "^3.0.0"
34+
"promwrap": "^2.1.0"
3535
},
3636
"files": [
3737
"lib"
@@ -50,11 +50,13 @@
5050
"get-port": "^3.2.0",
5151
"husky": "^0.14.3",
5252
"lint-staged": "^4.2.3",
53+
"mitm": "^1.3.3",
5354
"mocha": "^4.0.1",
5455
"mqtt-connection": "^3.1.0",
5556
"nyc": "^11.2.1",
5657
"prettier-eslint-cli": "^4.4.0",
5758
"semantic-release": "^8.0.3",
59+
"stoppable": "^1.0.5",
5860
"unexpected": "^10.36.0"
5961
},
6062
"engines": {

test/harness/index.js

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,48 @@
33
const _ = require('lodash');
44
const {Server} = require('net');
55
const MqttConnection = require('mqtt-connection');
6+
const MITM = require('mitm');
7+
const promisify = require('promwrap');
8+
const stoppable = require('stoppable');
69

7-
class Broker extends Server {
10+
class BaseServer extends Server {
811
constructor(listener) {
912
super();
10-
this.on('connection', stream => {
11-
this.emit('client', new MqttConnection(stream));
12-
});
13+
1314
if (listener) {
1415
this.on('client', listener);
1516
}
1617
}
1718
}
1819

19-
exports.Broker = Broker;
20+
class Broker extends BaseServer {
21+
constructor(listener) {
22+
super(listener);
23+
24+
this.on('connection', sock => {
25+
this.emit('client', new MqttConnection(sock));
26+
});
27+
}
28+
}
29+
30+
class MITMBroker extends BaseServer {
31+
listen(ignored, done) {
32+
this.mitm = MITM();
33+
this.mitm.on('connection', sock => {
34+
this.emit('client', new MqttConnection(sock));
35+
});
36+
process.nextTick(done);
37+
}
2038

21-
exports.createBroker = async (port, transformers = {}) => {
39+
close(done) {
40+
this.mitm.disable();
41+
process.nextTick(done);
42+
}
43+
}
44+
45+
exports.Broker = BaseServer;
46+
47+
exports.createBroker = async ({port, path, mitm, transformers = {}} = {}) => {
2248
transformers = _.defaults(transformers, {
2349
connack(...args) {
2450
return {returnCode: 0};
@@ -35,7 +61,8 @@ exports.createBroker = async (port, transformers = {}) => {
3561
pubrec: _.identity,
3662
publish: _.identity
3763
});
38-
const broker = new Broker(client => {
64+
65+
const listener = client => {
3966
client
4067
.on('connect', (...args) => {
4168
client.connack(transformers.connack(...args));
@@ -70,28 +97,17 @@ exports.createBroker = async (port, transformers = {}) => {
7097
break;
7198
}
7299
});
73-
})
74-
.on('close', () => {
75-
client.destroy();
76-
})
77-
.on('error', () => {
78-
client.destroy();
79-
})
80-
.on('timeout', () => {
81-
client.destroy();
82-
})
83-
.on('disconnect', () => {
84-
client.destroy();
85100
});
86-
});
101+
};
102+
103+
const broker = stoppable(new (mitm ? MITMBroker : Broker)(listener), 0);
87104
broker.transformers = transformers;
88105
broker.port = port;
89-
return new Promise((resolve, reject) => {
90-
broker.listen(port, err => {
91-
if (err) {
92-
return reject(err);
93-
}
94-
resolve(broker);
95-
});
106+
broker.path = path;
107+
const promisifiedBroker = promisify(broker, {
108+
exclude: ['unref', 'address', 'ref']
96109
});
110+
111+
await promisifiedBroker.listen(port || path);
112+
return promisifiedBroker;
97113
};

0 commit comments

Comments
 (0)