-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathtcp.js
124 lines (105 loc) · 3.04 KB
/
tcp.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
const debug = require('diagnostics')('raft')
, argv = require('argh').argv
, LifeRaft = require('../')
, net = require('net');
//
// Create a custom Raft instance which uses a plain TCP server and client to
// communicate back and forth.
//
class TCPRaft extends LifeRaft {
/**
* Initialized, start connecting all the things.
*
* @param {Object} options Options.
* @api private
*/
initialize (options) {
// var raft = this;
const server = net.createServer((socket) => {
socket.on('data', buff => {
var data = JSON.parse(buff.toString());
debug(this.address +':packet#data', data);
this.emit('data', data, data => {
debug(this.address +':packet#reply', data);
socket.write(JSON.stringify(data));
socket.end();
});
});
}).listen(this.address);
this.once('end', function enc() {
server.close();
});
}
/**
* The message to write.
*
* @TODO implement indefinitely sending of packets.
* @param {Object} packet The packet to write to the connection.
* @param {Function} fn Completion callback.
* @api private
*/
write (packet, fn) {
const socket = net.connect(this.address);
debug(this.address +':packet#write', packet);
socket.on('error', fn);
socket.on('data', buff => {
let data;
try { data = JSON.parse(buff.toString()); }
catch (e) { return fn(e); }
debug(this.address +':packet#callback', packet);
fn(undefined, data);
});
socket.setNoDelay(true);
socket.write(JSON.stringify(packet));
}
}
//
// We're going to start with a static list of servers. A minimum cluster size is
// 4 as that only requires majority of 3 servers to have a new leader to be
// assigned. This allows the failure of one single server.
//
const ports = [
8081, 8082,
8083, 8084,
8085, 8086
];
//
// The port number of this Node process.
//
const port = +argv.port || ports[0];
//
// Now that we have all our variables we can safely start up our server with our
// assigned port number.
//
const raft = new TCPRaft(port, {
'election min': 2000,
'election max': 5000,
'heartbeat': 1000
});
raft.on('heartbeat timeout', () => {
debug('heart beat timeout, starting election');
});
raft.on('term change', (to, from) => {
debug('were now running on term %s -- was %s', to, from);
}).on('leader change', function (to, from) {
debug('we have a new leader to: %s -- was %s', to, from);
}).on('state change', function (to, from) {
debug('we have a state to: %s -- was %s', to, from);
});
raft.on('leader', () => {
console.log('@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@');
console.log('I am elected as leader');
console.log('@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@');
});
raft.on('candidate', () => {
console.log('----------------------------------');
console.log('I am starting as candidate');
console.log('----------------------------------');
});
//
// Join in other nodes so they start searching for each other.
//
ports.forEach(nr => {
if (!nr || port === nr) return;
raft.join(nr);
});