/
message-sync.mixin.ts
80 lines (75 loc) · 2.57 KB
/
message-sync.mixin.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import { hook, Mixin } from './mixin';
import { RealtimeClient } from '../realtime.client';
import { Topics } from '../../constants';
import { tryUnzipAsync } from '../../shared';
import { IrisParserData } from '../parsers';
import { IllegalStateError } from 'mqtts';
export class MessageSyncMixin extends Mixin {
apply(client: RealtimeClient): void {
hook(client, 'connect', {
post: () => {
if (!client.mqtt) {
throw new IllegalStateError('No mqtt client created');
}
client.mqtt.listen(
{
topic: Topics.MESSAGE_SYNC.id,
transformer: async ({ payload }) =>
Topics.MESSAGE_SYNC.parser
.parseMessage(Topics.MESSAGE_SYNC, await tryUnzipAsync(payload))
.map(msg => msg.data),
},
data => this.handleMessageSync(client, data),
);
},
});
}
private handleMessageSync(client: RealtimeClient, syncData: IrisParserData[]) {
for (const element of syncData) {
const data = element.data;
if (!data) {
client.emit('iris', element);
continue;
}
delete element.data;
data.forEach(e => {
if (!e.path) {
client.emit('iris', { ...element, ...e });
}
if (e.path.startsWith('/direct_v2/threads') && e.value) {
client.emit('message', {
...element,
message: {
path: e.path,
op: e.op,
thread_id: MessageSyncMixin.getThreadIdFromPath(e.path),
...JSON.parse(e.value),
},
});
} else {
client.emit('threadUpdate', {
...element,
meta: {
path: e.path,
op: e.op,
thread_id: MessageSyncMixin.getThreadIdFromPath(e.path),
},
update: {
...JSON.parse(e.value),
},
});
}
});
}
}
private static getThreadIdFromPath(path: string): string | undefined {
const itemMatch = path.match(/^\/direct_v2\/threads\/(\d+)/);
if (itemMatch) return itemMatch[1];
const inboxMatch = path.match(/^\/direct_v2\/inbox\/threads\/(\d+)/);
if (inboxMatch) return inboxMatch[1];
return undefined;
}
get name(): string {
return 'Message Sync';
}
}