/
MQTTServer.ts
132 lines (124 loc) · 4.87 KB
/
MQTTServer.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import * as Aedes from 'aedes';
import { createServer, Server } from 'net';
import { createServer as createSecureServer } from 'tls';
import { MQTTServerOptions } from './MQTTServerOptions';
import { MQTTClient } from '../../client/service/MQTTClient';
import { MQTTClientOptions } from '../../client/service/MQTTClientOptions';
import { WebSocket } from 'ws';
import { Server as HTTPSServer, createServer as createServerHTTPS } from 'https';
import { Server as HTTPServer, createServer as createServerHTTP } from 'http';
/**
* MQTT Server
*/
export class MQTTServer extends MQTTClient {
protected options: MQTTServerOptions & MQTTClientOptions;
server: Server | HTTPSServer | HTTPServer;
aedes: Aedes;
constructor(options?: MQTTServerOptions) {
super({
url: '',
...(options || {
port: 1883,
}),
});
if (this.options.websocket) {
this.options = {
keepalive: 30,
protocolId: 'MQTT',
protocolVersion: 4,
clean: true,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
...this.options,
};
}
this.options.url =
(this.options.websocket ? `ws${this.options.tls ? 's' : ''}://` : `mqtt://`) + `localhost:${options.port}`;
this.removeAllListeners('build');
this.once('build', this._onInitServer.bind(this));
this.once('destroy', this._onDestroy.bind(this));
}
private _onInitServer(): Promise<void> {
return new Promise((resolve, reject) => {
const brokerId = `BROKER_${process.pid}_${Math.random().toString(16).substring(2, 8)}`;
this.aedes = new Aedes({
id: brokerId,
...this.options,
});
this.aedes.on('subscribe', (subscriptions, client) => {
this.model.logger(
'info',
'MQTT client [' +
(client ? client.id : client) +
'] subscribed to topics: ' +
subscriptions.map((s) => s.topic).join('\n') +
' from broker ' +
brokerId,
);
});
this.aedes.on('client', (client) => {
this.model.logger(
'info',
'Client Connected: [' + (client ? client.id : client) + ']' + ' to broker ' + brokerId,
);
});
if (this.options.websocket) {
// Create websocket server
if (this.options.tls) {
this.server = createServerHTTPS({
key: this.options.key,
cert: this.options.cert,
});
} else {
this.server = createServerHTTP();
}
const wss = new WebSocket.Server({ server: this.server as HTTPServer | HTTPServer });
wss.on('connection', (ws) => {
const duplex = WebSocket.createWebSocketStream(ws);
this.aedes.handle(duplex);
});
} else {
// Create socket server
if (this.options.tls) {
this.server = createSecureServer(
{
key: this.options.key,
cert: this.options.cert,
},
this.aedes.handle,
);
} else {
this.server = createServer(this.aedes.handle);
}
}
this.model.logger('debug', `Starting server on port ${this.options.port}: ${brokerId} ...`);
this.server.on('error', (error: Error) => {
this.model.logger('error', error.message);
});
this.server.on('listening', () => {
this.model.logger('info', `Server listening on port ${this.options.port}: ${brokerId}`);
this.connect()
.then(() => {
resolve();
})
.catch(reject);
});
this.server.listen(this.options.port);
});
}
private _onDestroy(): Promise<void> {
return new Promise((resolve, reject) => {
this.model.logger('info', `Closing MQTT server ...`);
this.aedes.close(() => {
this.server.close((err?) => {
if (err) {
this.model.logger('error', `Unable to close socket server!`);
return reject(err);
}
this.model.logger('info', `MQTT server closed successfully!`);
resolve();
});
});
});
}
}