-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.ts
146 lines (121 loc) · 3.94 KB
/
server.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import * as WebSocket from 'ws';
import * as redisLibrary from 'redis';
import * as bluebird from 'bluebird';
import * as uuidv4 from 'uuid/v4';
// import * as RSMQWorker from 'rsmq-worker';
import {Mutex} from 'async-mutex';
import { CRDTCommand, Operation } from 'shared/dist/enums';
import {CRDTStructure} from 'shared/dist/CRDTStructure';
import { LocalQueue, LocalSubscriber, LocalPublisher } from './LocalQueue';
const redis = bluebird.promisifyAll(redisLibrary).createClient();
const mutex = new Mutex();
const PORT = process.env.PORT ? parseInt(process.env.PORT) : 8080;
const REDIS_DOCUMENT_KEY = 'document';
const REDIS_TOPIC = 'consumers';
const REDIS_QUEUE = 'queue';
const readDocument = async () => {
const json = await redis.getAsync(REDIS_DOCUMENT_KEY);
const document = json ? JSON.parse(json) : {tokens: [], text: ''};
return document;
}
interface CRDTWebSocket extends WebSocket {
windowSessionID: string;
}
const wss = new WebSocket.Server({ port: PORT });
const subscriber = new LocalSubscriber();
subscriber.onConsume(async (key, value) => {
const message = JSON.parse(value);
wss.clients.forEach((client: CRDTWebSocket) => {
if (client.windowSessionID === message.fromWindowSessionID ||
client.readyState !== WebSocket.OPEN) {
return;
}
client.send(JSON.stringify({
command: CRDTCommand.APPLY,
value: message.value,
}));
});
});
const publisher = new LocalPublisher();
publisher.onPublish(async (key, value) => {
subscriber.consume(key, value);
});
// const rsmq = new RSMQWorker(REDIS_QUEUE).start();
const queue = new LocalQueue();
queue.onConsume(async (messageString: string) => {
mutex
.runExclusive(async () => {
try {
const [
document,
llen,
] = await Promise.all([
readDocument(),
redis.llenAsync(REDIS_QUEUE),
]);
const crdtStructure = new CRDTStructure(document.tokens, document.text);
const message = JSON.parse(messageString);
const value = message.value;
value.offset = llen;
// @ts-ignore
// console.log(`delay: ${new Date() - new Date(message.time)}ms`);
switch (value.operation) {
case Operation.INSERT:
crdtStructure.handleRemoteInsert(value.crdtToken);
break;
case Operation.DELETE:
crdtStructure.handleRemoteDelete(value.crdtToken, '');
break;
}
const newDocument = {
tokens: crdtStructure.tokens,
text: crdtStructure.text,
};
// TODO: add to REDIS list for replay
// await redis.rpushAsync(REDIS_QUEUE, messageString);
await Promise.all([
redis.rpushAsync(REDIS_QUEUE, messageString),
redis.setAsync(REDIS_DOCUMENT_KEY, JSON.stringify(newDocument)),
]);
publisher.publish(REDIS_TOPIC, messageString);
} catch (error) {
console.error(error);
// TODO: figure out how do error handling
}
});
});
wss.on('connection', (ws: CRDTWebSocket) => {
console.log(`${new Date()}: Connection open on port: ${PORT}`);
ws.windowSessionID = uuidv4();
ws.on('close', () => {
console.log('disconnected');
});
ws.on('message', async (data) => {
let message;
try {
message = JSON.parse(data.toString());
} catch (error) {
console.error(error);
return;
}
switch (message.command) {
case CRDTCommand.LOAD:
const document = await readDocument();
ws.send(JSON.stringify({
command: CRDTCommand.LOAD,
value: document,
}));
break;
case CRDTCommand.APPLY:
message.fromWindowSessionID = ws.windowSessionID;
message.time = new Date().toISOString();
queue.consume(JSON.stringify(message));
break;
case CRDTCommand.REPLAY:
// TODO
const start = 0;
await redis.lrangeAsync(REDIS_QUEUE, start);
break;
}
});
});