-
Notifications
You must be signed in to change notification settings - Fork 0
/
libBot.ts
373 lines (319 loc) · 11.4 KB
/
libBot.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
import * as tc from "./transcoder";
import { stringify as serializerStringify, parse as serializerParse } from "./serializer";
const SEQ_MAX = process.env.NODE_ENV === "test" ? 100 : 2 ** 16 - 1;
const SEQ_LOWER = Math.floor(SEQ_MAX * 0.1);
const SEQ_UPPER = Math.floor(SEQ_MAX * 0.9);
function adaptOffset(n: number) {
return ((n - 1) % SEQ_MAX) + 1;
}
export interface LibBotOptions {
/**
* When a delivery failure is detected (when calling receiveMessage), immediately return all messages that have not been successfully delivered.
* Default: false
*/
autoRetransmit?: boolean;
/**
* Automatically return acks (by internally calling sendAcks) when calling receiveMessage if n or more messages have been received since last sending acks.
* Default: off
*/
autoAckAfterMessages?: number;
/**
* Automatically return acks (by internally calling sendAcks) when calling receiveMessage if n or more incoming messages have been lost before being received.
* Default: off
*/
autoAckOnFailedMessages?: number;
/**
* Restore state of old LibTop instance. If present, other options will be ignored.
*/
restoreState?: string;
}
export interface ReceivedMessage {
/**
* The message.
*/
msg: Buffer;
/**
* Type of message. If unspecified it is treated as "full".
*/
type?: ReceivedMessageType;
}
export enum ReceivedMessageType {
full = "full",
ordered = "ordered",
unordered = "unordered",
}
/**
* Provides delivery and ordering guarantees for given Buffer messages with minimal overhead and few additional messages (depending on the options).
*/
export default class LibBot {
private readonly options: LibBotOptions;
/**
* received but not emitted (waiting for correct order)
*/
private readonly received: Map<number, Buffer>;
/**
* Sent and unacknowledged (under maxAck)
*/
private readonly sent: Map<
number,
{
buf: Buffer;
maxAck?: number;
}
>;
/**
* True if LibBot is currently looping seq numbers
*/
private inTransition: boolean;
/**
* Known failed delivery
*/
private readonly sendFail: Map<number, Buffer>;
/**
* highest sent seq known to have been received (may be missing some messages)
*/
maxSendSeqKnownReceived: number;
/**
* highest seq received so far
*/
maxIncSeq: number;
/**
* highest ack that was sent and is known to have been received
*/
maxSendAckKnownReceived: number;
/**
* maximum sequence number sent so far
*/
maxSendSeq: number;
/**
* maximum sequence number sent so far
*/
maxSendAck: number;
/**
* highest received and emitted (received all messages up to this point)
*/
maxEmittedSeq: number;
private recSeqOffset: number;
constructor(options: LibBotOptions = {}) {
if (options.restoreState) {
const rs = serializerParse(options.restoreState);
this.options = rs.options;
this.received = rs.received;
this.sent = rs.sent;
this.sendFail = rs.sendFail;
this.maxIncSeq = rs.maxIncSeq;
this.maxSendAckKnownReceived = rs.maxSendAckKnownReceived;
this.maxSendSeq = rs.maxSendSeq;
this.maxSendAck = rs.maxSendAck;
this.maxSendSeqKnownReceived = rs.maxSendAckKnownReceived;
this.maxEmittedSeq = rs.maxEmittedSeq;
this.inTransition = rs.inTransition;
this.recSeqOffset = rs.recSeqOffset;
} else {
this.options = options;
this.received = new Map();
this.sent = new Map();
this.sendFail = new Map();
this.maxIncSeq = 0;
this.maxSendAckKnownReceived = 0;
this.maxSendSeq = 0;
this.maxSendAck = 0;
this.maxSendSeqKnownReceived = 0;
this.maxEmittedSeq = 0;
this.inTransition = false;
this.recSeqOffset = 0;
}
}
getLibState(): string {
return serializerStringify(this);
}
/**
* Number of outgoing messages that are known to have been lost after sending (from received acks).
*/
get failedSendMessageCount() {
return this.sendFail.size;
}
/**
* Number of incomingmessages that are known to have been lost (from received seq).
*/
get failedReceiveMessageCount() {
return this.maxIncSeq - this.maxEmittedSeq - this.received.size;
}
/**
* Messages that have been sent, but not yet acked.
*/
get unackedMessageCount() {
return this.sent.size;
}
/**
* Send a message to the other side
* @function send
* @param {Buffer} message Message to send
* @returns {Buffer} Message to forward to the other side
*/
send(buf = Buffer.allocUnsafe(0)): Buffer {
this.maxSendSeq++;
const acks = this.getAcks();
const msg = tc.encodeSeqAck(adaptOffset(this.maxSendSeq), acks, buf);
// if (acks[0] > this.maxSendAck) {
// [this.maxSendAck] = acks;
// }
let maxAck;
if (acks.length > 0) {
maxAck = acks[0] + this.recSeqOffset * SEQ_MAX;
if (this.inTransition && acks[0] < SEQ_LOWER) maxAck += SEQ_MAX;
}
this.sent.set(this.maxSendSeq, {
buf,
maxAck,
});
return msg;
}
/**
* Resend messages that are known to have been lost.
* @function sendFailedMessages send
* @returns {Array<Buffer>} Messages to forward to the other side
*/
sendFailedMessages(): Array<Buffer> {
// TODO: add options: force retransmit, add acks (to none, all, some?)
const toSend: Array<Buffer> = [];
// console.log(`bot sending seq:${this.maxSendSeq}, len:${buf.length}`, buf);
this.sendFail.forEach((v, k) => {
const msg = tc.encodeSeqAck(adaptOffset(k), [], v);
toSend.push(msg);
this.sent.set(k, {
buf: v,
});
});
this.sendFail.clear();
return toSend;
}
/**
* Send an empty message that contains only acks
* @function sendAcks send
* @returns {Array<Buffer>} Messages to forward to the other side
*/
sendAcks(): Buffer {
const message = tc.encodeSeqAck(0, this.getAcks());
return message;
}
/**
* Get the current array of acknowledgements
* @function getAcks
* @returns {Array<number>} acks
*/
getAcks(): Array<number> {
const acks: Array<number> = [];
if (this.maxIncSeq > this.maxSendAckKnownReceived) {
// some acks must be sent
acks.push(adaptOffset(this.maxIncSeq));
for (let i = this.maxEmittedSeq + 1; i <= this.maxIncSeq; i++) {
// // console.log(`---other acks min:${this.maxEmittedSeq}, max:${this.maxIncSeq}, i:${i}`, this.received.has(i))
if (this.received.has(i)) {
continue;
}
// console.log(`- --- pushing ${i} into outgoing acks`);
acks.push(adaptOffset(i));
}
}
return acks;
}
/**
* A new message has been received from the other side
* @function receiveMessage
* @param {Buffer} message
* @returns An array of messages to send and the processed received messages
*/
receiveMessage(buf: Buffer): [Array<Buffer>, Array<ReceivedMessage> | null] {
const output: Array<Buffer> = [];
// eslint-disable-next-line prefer-const
let [seq, acks, payload] = tc.decodeSeqAck(buf);
// console.log(`bot received message seq:${seq} plen:${payload.length} acks:`, acks);
// Adapt offset
if (this.inTransition && seq > adaptOffset(this.maxEmittedSeq)) {
this.inTransition = false;
// transition stage has ended
this.recSeqOffset++;
}
if (seq < SEQ_LOWER && adaptOffset(this.maxEmittedSeq) > SEQ_UPPER) {
// transition stage
if (!this.inTransition) this.inTransition = true;
seq += (this.recSeqOffset + 1) * SEQ_MAX;
} else {
seq += this.recSeqOffset * SEQ_MAX;
}
acks = acks.map((a) => {
a += Math.floor(this.maxSendSeqKnownReceived / SEQ_MAX) * SEQ_MAX;
if (a < this.maxSendSeqKnownReceived) {
a += SEQ_MAX;
}
return a;
});
// Process Acks
const [maxAck, ...missingAcks] = acks;
if (maxAck > this.maxSendSeqKnownReceived) {
this.maxSendSeqKnownReceived = maxAck;
// console.log(`bot new max ack ${maxAck}`);
}
if (maxAck) {
this.sent.forEach((v, k) => {
if (missingAcks.includes(k) || k > maxAck) {
// console.log(`bot delivery failed. set up for redelivery seq:${k}, maxAck:${maxAck}`, missingAcks);
this.sendFail.set(k, v.buf);
} else if (v.maxAck && v.maxAck > this.maxSendAckKnownReceived) {
this.maxSendAckKnownReceived = v.maxAck;
}
// console.log(`bot delete seq:${k} from sent, maxAck:${maxAck}`, missingAcks)
});
this.sent.clear();
// console.log(
// `bot acks processed max:${maxAck} sendFail:${Array.from(this.sendFail.keys())} sent:${Array.from(
// this.sent.keys()
// )}`
// );
// done incoming acks
if (this.options.autoRetransmit && this.failedSendMessageCount > 0) {
this.sendFailedMessages().map((b) => output.push(b));
}
}
if (seq <= this.maxEmittedSeq) {
// // console.log(`bot got old message seq:${seq}, maxEmit:${this.maxEmittedSeq}`)
return [[], null];
}
if (seq > this.maxIncSeq) {
this.maxIncSeq = seq;
}
if (seq !== 0) this.received.set(seq, payload);
// emit messages that are in sequence
const orderedMessages: Array<Buffer> = [];
while (this.received.has(this.maxEmittedSeq + 1)) {
this.maxEmittedSeq++;
orderedMessages.push(this.received.get(this.maxEmittedSeq));
this.received.delete(this.maxEmittedSeq);
}
// Send acks if this.autoAckAfterMessages messages have been received without an acknowledgement being sent
if (
(this.options.autoAckAfterMessages &&
this.maxIncSeq - this.maxSendAck >= this.options.autoAckAfterMessages) ||
(this.options.autoAckOnFailedMessages &&
this.failedReceiveMessageCount >= this.options.autoAckOnFailedMessages)
) {
output.push(this.sendAcks());
}
const outputReceivedMessages: Array<ReceivedMessage> = [];
if (payload.length > 0) {
outputReceivedMessages.push({
msg: payload,
type: orderedMessages.includes(payload) ? ReceivedMessageType.full : ReceivedMessageType.unordered,
});
}
orderedMessages.map((msg) => {
if (msg === payload) return;
outputReceivedMessages.push({
msg,
type: ReceivedMessageType.ordered,
});
});
return [output, outputReceivedMessages];
}
}