/
causal_message_buffer.ts
344 lines (316 loc) · 10.5 KB
/
causal_message_buffer.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
import { MessageMeta, int64AsNumber, nonNull } from "@collabs/core";
import { CausalMessageBufferSave } from "../../generated/proto_compiled";
import { CRDTMessageMeta } from "./crdt_meta";
import { LoadCRDTMeta, ReceiveCRDTMeta } from "./crdt_meta_implementations";
import { MessageSerializer } from "./message_serializer";
interface ReceivedMessage {
// OPT: dedupe message and messageStacks. Perhaps parse meta first,
// then wait to parse messageStacks until you're ready to deliver?
/** The original serialized message. */
message: Uint8Array;
messageStacks: (Uint8Array | string)[][];
meta: MessageMeta;
caller: unknown;
}
/**
* A buffer for delivering messages in causal order, used
* by [[CRuntime]].
*
* Also manages CRDT metadata (vector clock, causally maximal keys,
* Lamport timestamp).
*/
export class CausalMessageBuffer {
/**
* The vector clock. Missing entries are presumed 0.
*
* An entry for this replica is always present, even when 0.
*
* Do not modify externally.
*/
readonly vc = new Map<string, number>();
/**
* Never includes us, even if we are causally maximal.
*
* If causalityGuaranteed, this is always empty.
*
* Do not modify externally.
*/
readonly maximalVCKeys = new Set<string>();
/**
* The Lamport timestamp.
*
* Do not modify externally.
*
* Although this starts at 0, any transaction that uses the Lamport timestamp
* will have a positive value, since tick() increments it.
*/
lamportTimestamp = 0;
/**
* Internal buffer for messages that have been received but not
* yet delivered, either because check() was not called or they
* are not causally ready.
*
* Keyed by encodeDot's output.
*/
private readonly buffer = new Map<string, ReceivedMessage>();
/**
* @param deliver Callback to deliver messages, where
* "deliver" means "actually process since it's causally
* ready now".
*/
constructor(
private readonly replicaID: string,
private readonly causalityGuaranteed: boolean,
private readonly deliver: (
message: Uint8Array,
messageStacks: (Uint8Array | string)[][],
meta: MessageMeta,
caller: unknown | undefined
) => void
) {
// this.replicaID is the first map entry.
this.vc.set(this.replicaID, 0);
}
private encodeDot(crdtMeta: CRDTMessageMeta) {
return `${crdtMeta.senderCounter},${crdtMeta.senderID}`;
}
/**
* Processes the given remote message:
* - If already delivered, does nothing.
* - Else if ready for delivery, delivers it.
* - Else adds it to the buffer.
*
* @param message Must be a transaction-message, not a merged-message.
* @returns Whether the message was delivered.
*/
process(
message: Uint8Array,
messageStacks: (Uint8Array | string)[][],
meta: MessageMeta,
caller: unknown
): boolean {
const crdtMeta = <ReceiveCRDTMeta>meta.runtimeExtra;
if (!this.isAlreadyDelivered(crdtMeta)) {
if (this.isReady(crdtMeta)) {
// Ready for delivery.
this.deliver(message, messageStacks, meta, caller);
this.processRemoteDelivery(crdtMeta);
return true;
} else {
// Add to this.buffer if it's not already present.
const dot = this.encodeDot(crdtMeta);
if (!this.buffer.has(dot)) {
this.buffer.set(dot, { message, messageStacks, meta, caller });
}
}
}
return false;
}
/**
* Checks the buffer and delivers any causally ready
* messages.
*
* @returns Whether any messages were delivered.
*/
check(): boolean {
let delivered = false;
let recheck = false;
do {
recheck = false;
for (const [dot, tr] of this.buffer) {
const crdtMeta = <ReceiveCRDTMeta>tr.meta.runtimeExtra;
if (this.isReady(crdtMeta)) {
// Ready for delivery.
this.buffer.delete(dot);
this.deliver(tr.message, tr.messageStacks, tr.meta, tr.caller);
this.processRemoteDelivery(crdtMeta);
delivered = true;
// Delivering messages may make new ones ready, so go
// through the whole buffer again.
recheck = true;
} else {
if (this.isAlreadyDelivered(crdtMeta)) {
// Remove from the buffer.
this.buffer.delete(dot);
}
}
}
} while (recheck);
return delivered;
}
/**
* @return whether a message with the given crdtMeta
* is ready for delivery, according to the causal order.
*/
private isReady(crdtMeta: ReceiveCRDTMeta): boolean {
if (this.causalityGuaranteed) return true;
// Check that sender's entry is one more than ours.
if ((this.vc.get(crdtMeta.senderID) ?? 0) !== crdtMeta.senderCounter - 1) {
return false;
}
// Check that other causally maximal entries are <= ours.
let i = 0;
for (const [key, value] of crdtMeta.vcEntries) {
// maximalVCKeyCount omits senderID, so skip it without
// incrementing i.
if (key === crdtMeta.senderID) continue;
if (i === crdtMeta.maximalVCKeyCount) break;
if ((this.vc.get(key) ?? 0) < value) {
return false;
}
i++;
}
return true;
}
/**
* @return whether a message with the given sender and
* senderCounter
* has already been delivered.
*/
private isAlreadyDelivered(crdtMeta: ReceiveCRDTMeta): boolean {
const senderEntry = this.vc.get(crdtMeta.senderID);
if (senderEntry !== undefined) {
if (senderEntry >= crdtMeta.senderCounter) return true;
}
return false;
}
private processRemoteDelivery(crdtMeta: ReceiveCRDTMeta) {
if (!this.causalityGuaranteed) {
// Delete any current keys that are causally dominated by
// crdtMeta.
let i = 0;
for (const [key, value] of crdtMeta.vcEntries) {
// maximalVCKeyCount omits senderID, so skip it without
// incrementing i.
if (key === crdtMeta.senderID) continue;
if (i === crdtMeta.maximalVCKeyCount) break;
if (this.vc.get(key) === value) {
this.maximalVCKeys.delete(key);
}
i++;
}
// Add a new key for this message.
// Since it's remote, we know senderID is not our ID.
this.maximalVCKeys.add(crdtMeta.senderID);
}
// Update vc.
this.vc.set(crdtMeta.senderID, crdtMeta.senderCounter);
// Update Lamport timestamp if it's present.
// Skipping this when it's not present technically violates the def
// of Lamport timestamp, but it is still causally-compatible due to
// causal order delivery.
this.lamportTimestamp = Math.max(
this.lamportTimestamp,
crdtMeta.lamportTimestamp ?? 0
);
}
/**
* Update our meta for a new local transaction.
*/
tick() {
// Update vc.
this.vc.set(this.replicaID, nonNull(this.vc.get(this.replicaID)) + 1);
if (!this.causalityGuaranteed) {
// Our own message causally dominates every current key.
this.maximalVCKeys.clear();
}
// Update Lamport timestamp.
this.lamportTimestamp++;
}
save(): Uint8Array {
const vcKeys = new Array<string>(this.vc.size);
const vcValues = new Array<number>(this.vc.size);
let i = 0;
for (const [key, value] of this.vc) {
// Since this.replicaID is the first map entry, it is stored in
// vcKeys[0].
vcKeys[i] = key;
vcValues[i] = value;
i++;
}
const bufferMessages = new Array<Uint8Array>(this.buffer.size);
i = 0;
for (const tr of this.buffer.values()) {
bufferMessages[i] = tr.message;
i++;
}
const saveMessage = CausalMessageBufferSave.create({
vcKeys,
vcValues,
maximalVcKeys: [...this.maximalVCKeys],
lamportTimestamp: this.lamportTimestamp,
bufferMessages,
});
return CausalMessageBufferSave.encode(saveMessage).finish();
}
/**
* @param savedState
* @param used
*/
load(savedState: Uint8Array, caller: unknown): LoadCRDTMeta {
const oldLocalVC = new Map(this.vc);
const oldLocalLamportTimestamp = this.lamportTimestamp;
const decoded = CausalMessageBufferSave.decode(savedState);
const remoteVC = new Map<string, number>();
for (let i = 0; i < decoded.vcKeys.length; i++) {
remoteVC.set(decoded.vcKeys[i], int64AsNumber(decoded.vcValues[i]));
}
const remoteMaximalVCKeys = new Set(decoded.maximalVcKeys);
// 1. Delete our maximal entries that are not present in the saved
// state and that are causally dominated by the remote VC.
// (Strictly speaking, we compare entries not keys: values must match
// to be present in the intersection.)
for (const key of this.maximalVCKeys) {
const localValue = nonNull(this.vc.get(key));
const remoteValue = remoteVC.get(key) ?? 0;
// If the entry is not in the intersection...
if (!(remoteMaximalVCKeys.has(key) && localValue === remoteValue)) {
// ...and it's causally dominated, then delete it.
if (remoteValue >= localValue) this.maximalVCKeys.delete(key);
}
}
// 2. Add new maximal entries that are not
// causally dominated by the local VC.
for (const key of remoteMaximalVCKeys) {
if ((this.vc.get(key) ?? 0) < nonNull(remoteVC.get(key))) {
this.maximalVCKeys.add(key);
}
}
// Delete our replicaID if it ended up in maximalVCKeys.
this.maximalVCKeys.delete(this.replicaID);
for (const [key, value] of remoteVC) {
this.vc.set(key, Math.max(this.vc.get(key) ?? 0, value));
}
const remoteLamportTimestamp = int64AsNumber(decoded.lamportTimestamp);
this.lamportTimestamp = Math.max(
this.lamportTimestamp,
remoteLamportTimestamp
);
// Blindly merge buffers for now. CRuntime will call check() later
// to process any newly-ready messages (local or remote)
// and delete already-received messages.
for (let i = 0; i < decoded.bufferMessages.length; i++) {
const message = decoded.bufferMessages[i];
// Buffer messages are always transaction-messages (not merged).
const { messageStacks, meta } =
MessageSerializer.instance.deserialize(message)[0];
const dot = this.encodeDot(<CRDTMessageMeta>meta.runtimeExtra);
if (!this.buffer.has(dot)) {
this.buffer.set(dot, {
message,
messageStacks,
meta,
caller,
});
}
}
return new LoadCRDTMeta(
// First vc entry is the sender's replicaID.
decoded.vcKeys[0],
oldLocalVC,
remoteVC,
oldLocalLamportTimestamp,
remoteLamportTimestamp
);
}
}