/
rsObservable.ts
87 lines (69 loc) · 2.26 KB
/
rsObservable.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
'use strict';
import { cloneDeep, isString } from 'lodash';
import RsMessage, { RsMessageType } from './rsMessage';
import RsSubscriber from './rsSubscriber';
import { RsSubject } from './resynkd.types';
export default class RsObservable {
private _subjects: Map<string, RsSubject<any>> = new Map<string, RsSubject<any>>();
private _subscribers: Map<string, RsSubscriber> = new Map<string, RsSubscriber>();
public reset(): void {
this._subjects = new Map<string, RsSubject<any>>();
this._subscribers = new Map<string, RsSubscriber>();
}
public addSubject(subjectId: string, subject: RsSubject<any>): boolean {
if (this._subjects.has(subjectId)) throw new Error('Subject already exists.');
this._subjects.set(subjectId, subject);
return true;
}
public message(message: any, send: (msg: string) => any): boolean {
let msg = cloneDeep(message);
if (isString(msg)) {
try {
msg = JSON.parse(msg);
} catch (err) {
}
}
if (msg.rsynkd) {
const { method } = msg.rsynkd;
switch (method) {
case 'subscribe':
return this._subscribe(msg, send);
case 'unsubscribe':
return this._unsubscribe(msg);
}
}
return false;
}
public removeSubscriber(socketId: string): boolean {
this._subscribers.delete(socketId);
return true;
}
private _getOrCreateSubscriber(socketId: string, send: (msg: string) => any) {
let subscriber = this._subscribers.get(socketId);
if (subscriber) return subscriber;
return new RsSubscriber(socketId, send);
}
private _subscribe(msg: RsMessageType, send: (msg: string) => any): boolean {
const { socketId, subjectId } = msg.rsynkd;
let subject = this._subjects.get(subjectId);
if (!subject) {
send(RsMessage('noSuchObservable', socketId, subjectId));
} else {
let subscriber = this._getOrCreateSubscriber(socketId, send);
try {
subscriber.subscribe(subjectId, subject);
} catch (err) {
send(RsMessage('error', socketId, subjectId, {
error: `The socket:${socketId} already subscribed to subject:${subjectId}.`,
}));
}
}
return true;
}
private _unsubscribe(msg: RsMessageType): boolean {
const { socketId, subjectId } = msg.rsynkd;
let subscriber = this._subscribers.get(socketId);
if (subscriber) subscriber.unsubscribe(subjectId);
return true;
}
}