-
Notifications
You must be signed in to change notification settings - Fork 0
/
channel.js
76 lines (67 loc) · 2.07 KB
/
channel.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
require('dotenv').config();
const crypto = require('crypto');
const {
stringToHostAndPort, getCurrentIp,
setPending, publishToChannel, getReplicasConfig, setRole,
} = require('./util');
const SocketConnection = require('./connection');
// const ROLE = process.env.ROLE || 'replica';
const CLUSTER_MODE = process.env.CLUSTER_MODE || 'off';
const ROLE = CLUSTER_MODE === 'on' ? 'replica' : 'master';
const randomId = () => crypto.randomBytes(8).toString('hex');
class Channel {
constructor() {
this.queueChannels = {};
this.connections = [];
this.id = randomId();
this.init();
}
async init() {
this.ip = await getCurrentIp();
if (ROLE === 'master') {
const role = await setRole(this.ip);
this.role = role;
const message = { method: 'join', ip: this.ip, role: this.role };
await publishToChannel(message);
return;
}
// check replica
this.role = 'replica';
await setPending(this.ip);
const message = { method: 'join', ip: this.ip, role: this.role };
await publishToChannel(message);
this.createReplicaConnections();
}
async createReplicaConnections() {
if (this.role !== 'master') {
return;
}
const replicasConfig = await getReplicasConfig();
replicasConfig.forEach(async (replicaConfig) => {
const connection = new SocketConnection(replicaConfig);
this.connections.push(connection);
this.setQueue(connection);
});
}
async createReplicaConnection(ip) {
if (this.role !== 'master') {
return;
}
const replicaConfig = stringToHostAndPort(ip);
const connection = new SocketConnection(replicaConfig);
this.connections.push(connection);
this.setQueue(connection);
}
setQueue(connection) {
const message = { id: this.id, method: 'setQueue', queueChannels: this.queueChannels };
connection.send(message);
}
async send(message) {
for (let i = 0; i < this.connections.length; i++) {
const connection = this.connections[i];
connection.send(message);
}
}
}
const channel = new Channel();
module.exports = channel;