/
realtime-sub.mixin.ts
65 lines (61 loc) · 2.23 KB
/
realtime-sub.mixin.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import { hook, Mixin } from './mixin';
import { RealtimeClient } from '../realtime.client';
import { Topics } from '../../constants';
import { tryUnzipAsync } from '../../shared';
import { GraphQlMessage, ParsedMessage } from '../parsers';
import { QueryIDs } from '../subscriptions';
import { RealtimeSubDirectDataWrapper } from '../messages';
import { IllegalStateError } from 'mqtts';
export class RealtimeSubMixin extends Mixin {
apply(client: RealtimeClient): void {
hook(client, 'connect', {
post: () => {
if (!client.mqtt) {
throw new IllegalStateError('No mqtt client created');
}
client.mqtt.listen(
{
topic: Topics.REALTIME_SUB.id,
transformer: async ({ payload }) =>
Topics.REALTIME_SUB.parser.parseMessage(Topics.REALTIME_SUB, await tryUnzipAsync(payload)),
},
data => this.handleRealtimeSub(client, data),
);
},
});
}
private handleRealtimeSub(client: RealtimeClient, { data, topic: messageTopic }: ParsedMessage<GraphQlMessage>) {
const { message } = data;
client.emit('realtimeSub', { data, topic: messageTopic });
if (typeof message === 'string') {
this.emitDirectEvent(client, JSON.parse(message));
} else {
const { topic, payload, json } = message;
switch (topic) {
case 'direct': {
this.emitDirectEvent(client, json);
break;
}
default: {
const entries = Object.entries(QueryIDs);
const query = entries.find(e => e[1] === topic);
if (query) {
client.emit(query[0] as keyof typeof QueryIDs, json || payload);
}
}
}
}
}
private emitDirectEvent(client: RealtimeClient, parsed: any): void {
parsed.data = parsed.data.map((e: any) => {
if (typeof e.value === 'string') {
e.value = JSON.parse(e.value);
}
return e;
});
parsed.data.forEach((data: RealtimeSubDirectDataWrapper) => client.emit('direct', data));
}
get name(): string {
return 'Realtime Sub';
}
}