-
Notifications
You must be signed in to change notification settings - Fork 27
/
mqtt.ts
406 lines (368 loc) · 14.7 KB
/
mqtt.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
import * as mqtt from "mqtt";
import * as WebsocketUtils from "./ws";
import { Trie, TrieOp, Node as TrieNode } from "./trie";
import { BufferedEventEmitter } from "../common/event";
import { CrtError } from "../browser";
import { ClientBootstrap, SocketOptions } from "./io";
import { QoS, Payload, MqttRequest, MqttSubscribeRequest, MqttWill } from "../common/mqtt";
export { QoS, Payload, MqttRequest, MqttSubscribeRequest, MqttWill } from "../common/mqtt";
/** @category MQTT */
export type WebsocketOptions = WebsocketUtils.WebsocketOptions;
/** @category MQTT */
export type AWSCredentials = WebsocketUtils.AWSCredentials;
/**
* Configuration options for an MQTT connection
*
* @module aws-crt
* @category MQTT
*/
export interface MqttConnectionConfig {
/**
* ID to place in CONNECT packet. Must be unique across all devices/clients.
* If an ID is already in use, the other client will be disconnected.
*/
client_id: string;
/** Server name to connect to */
host_name: string;
/** Server port to connect to */
port: number;
/** Socket options, ignored in browser */
socket_options: SocketOptions;
/**
* Whether or not to start a clean session with each reconnect.
* If True, the server will forget all subscriptions with each reconnect.
* Set False to request that the server resume an existing session
* or start a new session that may be resumed after a connection loss.
* The `session_present` bool in the connection callback informs
* whether an existing session was successfully resumed.
* If an existing session is resumed, the server remembers previous subscriptions
* and sends mesages (with QoS1 or higher) that were published while the client was offline.
*/
clean_session?: boolean;
/**
* The keep alive value, in seconds, to send in CONNECT packet.
* A PING will automatically be sent at this interval.
* The server will assume the connection is lost if no PING is received after 1.5X this value.
* This duration must be longer than {@link timeout}.
*/
keep_alive?: number;
/**
* Milliseconds to wait for ping response before client assumes
* the connection is invalid and attempts to reconnect.
* This duration must be shorter than keep_alive_secs.
* Alternatively, TCP keep-alive via :attr:`SocketOptions.keep_alive`
* may accomplish this in a more efficient (low-power) scenario,
* but keep-alive options may not work the same way on every platform and OS version.
*/
timeout?: number;
/**
* Will to send with CONNECT packet. The will is
* published by the server when its connection to the client is unexpectedly lost.
*/
will?: MqttWill;
/** Username to connect with */
username?: string;
/** Password to connect with */
password?: string;
/** Options for the underlying websocket connection */
websocket?: WebsocketOptions;
/** AWS credentials, which will be used to sign the websocket request */
credentials?: AWSCredentials;
}
/**
* MQTT client
*
* @module aws-crt
* @category MQTT
*/
export class MqttClient {
constructor(bootstrap?: ClientBootstrap) {
}
/**
* Creates a new {@link MqttClientConnection}
* @param config Configuration for the connection
* @returns A new connection
*/
new_connection(config: MqttConnectionConfig) {
return new MqttClientConnection(this, config);
}
}
/**
* @module aws-crt
* @category MQTT
*/
type SubscriptionCallback = (topic: string, payload: ArrayBuffer) => void;
/** @internal */
class TopicTrie extends Trie<SubscriptionCallback | undefined> {
constructor() {
super('/');
}
protected find_node(key: string, op: TrieOp) {
const parts = this.split_key(key);
let current = this.root;
let parent = undefined;
for (const part of parts) {
let child = current.children.get(part);
if (!child) {
child = current.children.get('#');
if (child) {
return child;
}
child = current.children.get('+');
}
if (!child) {
if (op == TrieOp.Insert) {
current.children.set(part, child = new TrieNode(part));
}
else {
return undefined;
}
}
parent = current;
current = child;
}
if (parent && op == TrieOp.Delete) {
parent.children.delete(current.key!);
}
return current;
}
}
/**
* Converts payload to a string regardless of the supplied type
* @param payload The payload to convert
* @internal
*/
function normalize_payload(payload: Payload): string {
let payload_data: string = payload.toString();
if (payload instanceof DataView) {
payload_data = new TextDecoder('utf8').decode(payload as DataView);
} else if (payload instanceof Object) {
// Convert payload to JSON string
payload_data = JSON.stringify(payload);
}
return payload_data;
}
/**
* MQTT client connection
*
* @module aws-crt
* @category MQTT
*/
export class MqttClientConnection extends BufferedEventEmitter {
private connection: mqtt.MqttClient;
private subscriptions = new TopicTrie();
private connection_count = 0;
/**
* @param client The client that owns this connection
* @param config The configuration for this connection
*/
constructor(
readonly client: MqttClient,
private config: MqttConnectionConfig) {
super();
const create_websocket_stream = (client: mqtt.MqttClient) => WebsocketUtils.create_websocket_stream(this.config);
const transform_websocket_url = (url: string, options: mqtt.IClientOptions, client: mqtt.MqttClient) => WebsocketUtils.create_websocket_url(this.config);
const will = this.config.will ? {
topic: this.config.will.topic,
payload: normalize_payload(this.config.will.payload),
qos: this.config.will.qos,
retain: this.config.will.retain,
} : undefined;
const websocketXform = (config.websocket || {}).protocol != 'wss-custom-auth' ? transform_websocket_url : undefined;
this.connection = new mqtt.MqttClient(
create_websocket_stream,
{
// service default is 1200 seconds
keepalive: this.config.keep_alive ? this.config.keep_alive : 1200,
clientId: this.config.client_id,
connectTimeout: this.config.timeout ? this.config.timeout : 30 * 1000,
clean: this.config.clean_session,
username: this.config.username,
password: this.config.password,
reconnectPeriod: 0,
will: will,
transformWsUrl: websocketXform,
}
);
this.connection.on('connect', this.on_connect);
this.connection.on('error', this.on_error);
this.connection.on('message', this.on_message);
this.connection.on('offline', this.on_offline);
this.connection.on('end', this.on_disconnected);
}
/** Emitted when the connection is ready and is about to start sending response data */
on(event: 'connect', listener: (session_present: boolean) => void): this;
/** Emitted when connection has closed sucessfully. */
on(event: 'disconnect', listener: () => void): this;
/**
* Emitted when an error occurs
* @param error - A CrtError containing the error that occurred
*/
on(event: 'error', listener: (error: CrtError) => void): this;
/**
* Emitted when the connection is dropped unexpectedly. The error will contain the error
* code and message.
*/
on(event: 'interrupt', listener: (error: CrtError) => void): this;
/**
* Emitted when the connection reconnects. Only triggers on connections after the initial one.
*/
on(event: 'resume', listener: (return_code: number, session_present: boolean) => void): this;
/**
* Emitted when any MQTT publish message arrives.
*/
on(event: 'message', listener: (topic: string, payload: Buffer) => void): this;
/** @internal */
on(event: string | symbol, listener: (...args: any[]) => void): this {
return super.on(event, listener);
}
private on_connect = (connack: mqtt.IConnackPacket) => {
this.on_online(connack.sessionPresent);
}
private on_online = (session_present: boolean) => {
if (++this.connection_count == 1) {
this.emit('connect', session_present);
} else {
this.emit('resume', 0, session_present);
}
}
private on_offline = () => {
this.emit('interrupt', -1);
}
private on_disconnected = () => {
this.emit('disconnect');
}
private on_error = (error: Error) => {
this.emit('error', new CrtError(error))
}
private on_message = (topic: string, payload: Buffer, packet: any) => {
const callback = this.subscriptions.find(topic);
if (callback) {
callback(topic, payload);
}
this.emit('message', topic, payload);
}
/**
* Open the actual connection to the server (async).
* @returns A Promise which completes whether the connection succeeds or fails.
* If connection fails, the Promise will reject with an exception.
* If connection succeeds, the Promise will return a boolean that is
* true for resuming an existing session, or false if the session is new
*/
async connect() {
setTimeout(() => { this.uncork() }, 0);
return new Promise<boolean>((resolve, reject) => {
const on_connect_error = (error: Error) => {
reject(new CrtError(error));
};
this.connection.once('connect', (connack: mqtt.IConnackPacket) => {
this.connection.removeListener('error', on_connect_error);
resolve(connack.sessionPresent);
});
this.connection.once('error', on_connect_error);
});
}
/**
* The connection will automatically reconnect. To cease reconnection attempts, call {@link disconnect}.
* To resume the connection, call {@link connect}.
* @deprecated
*/
async reconnect() {
return this.connect();
}
/**
* Publish message (async).
* If the device is offline, the PUBLISH packet will be sent once the connection resumes.
*
* @param topic Topic name
* @param payload Contents of message
* @param qos Quality of Service for delivering this message
* @param retain If true, the server will store the message and its QoS so that it can be
* delivered to future subscribers whose subscriptions match the topic name
* @returns Promise which returns a {@link MqttRequest} which will contain the packet id of
* the PUBLISH packet.
*
* * For QoS 0, completes as soon as the packet is sent.
* * For QoS 1, completes when PUBACK is received.
* * For QoS 2, completes when PUBCOMP is received.
*/
async publish(topic: string, payload: Payload, qos: QoS, retain: boolean = false): Promise<MqttRequest> {
let payload_data = normalize_payload(payload);
return new Promise((resolve, reject) => {
this.connection.publish(topic, payload_data, { qos: qos, retain: retain }, (error, packet) => {
if (error) {
reject(new CrtError(error));
return this.on_error(error);
}
resolve({ packet_id: (packet as mqtt.IPublishPacket).messageId })
});
});
}
/**
* Subscribe to a topic filter (async).
* The client sends a SUBSCRIBE packet and the server responds with a SUBACK.
*
* subscribe() may be called while the device is offline, though the async
* operation cannot complete successfully until the connection resumes.
*
* Once subscribed, `callback` is invoked each time a message matching
* the `topic` is received. It is possible for such messages to arrive before
* the SUBACK is received.
*
* @param topic Subscribe to this topic filter, which may include wildcards
* @param qos Maximum requested QoS that server may use when sending messages to the client.
* The server may grant a lower QoS in the SUBACK
* @param on_message Optional callback invoked when message received.
* @returns Promise which returns a {@link MqttSubscribeRequest} which will contain the
* result of the SUBSCRIBE. The Promise resolves when a SUBACK is returned
* from the server or is rejected when an exception occurs.
*/
async subscribe(topic: string, qos: QoS, on_message?: (topic: string, payload: ArrayBuffer) => void): Promise<MqttSubscribeRequest> {
this.subscriptions.insert(topic, on_message);
return new Promise((resolve, reject) => {
this.connection.subscribe(topic, { qos: qos }, (error, packet) => {
if (error) {
reject(new CrtError(error))
return this.on_error(error);
}
const sub = (packet as mqtt.ISubscriptionGrant[])[0];
resolve({ topic: sub.topic, qos: sub.qos });
});
});
}
/**
* Unsubscribe from a topic filter (async).
* The client sends an UNSUBSCRIBE packet, and the server responds with an UNSUBACK.
* @param topic The topic filter to unsubscribe from. May contain wildcards.
* @returns Promise wihch returns a {@link MqttRequest} which will contain the packet id
* of the UNSUBSCRIBE packet being acknowledged. Promise is resolved when an
* UNSUBACK is received from the server or is rejected when an exception occurs.
*/
async unsubscribe(topic: string): Promise<MqttRequest> {
this.subscriptions.remove(topic);
return new Promise((resolve, reject) => {
this.connection.unsubscribe(topic, undefined, (error, packet) => {
if (error) {
reject(new CrtError(error));
return this.on_error(error);
}
resolve({ packet_id: (packet as mqtt.IUnsubackPacket).messageId });
});
});
}
/**
* Close the connection (async).
* @returns Promise which completes when the connection is closed.
*/
async disconnect() {
return new Promise((resolve) => {
this.connection.end(undefined, undefined, () => {
resolve();
})
});
}
}