-
Notifications
You must be signed in to change notification settings - Fork 298
/
worker.js
312 lines (262 loc) · 9.45 KB
/
worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
var mongo = require('./mongo');
var k8s = require('./k8s');
var config = require('./config');
var ip = require('ip');
var async = require('async');
var moment = require('moment');
var dns = require('dns');
var os = require('os');
var loopSleepSeconds = config.loopSleepSeconds;
var unhealthySeconds = config.unhealthySeconds;
var hostIp = false;
var hostIpAndPort = false;
var init = function(done) {
//Borrowed from here: http://stackoverflow.com/questions/3653065/get-local-ip-address-in-node-js
var hostName = os.hostname();
dns.lookup(hostName, function (err, addr) {
if (err) {
return done(err);
}
hostIp = addr;
hostIpAndPort = hostIp + ':' + config.mongoPort;
done();
});
};
var workloop = function workloop() {
if (!hostIp || !hostIpAndPort) {
throw new Error('Must initialize with the host machine\'s addr');
}
//Do in series so if k8s.getMongoPods fails, it doesn't open a db connection
async.series([
k8s.getMongoPods,
mongo.getDb
], function(err, results) {
var db = null;
if (Array.isArray(results) && results.length === 2) {
db = results[1];
}
if (err) {
return finish(err, db);
}
var pods = results[0];
//Lets remove any pods that aren't running or haven't been assigned an IP address yet
for (var i = pods.length - 1; i >= 0; i--) {
var pod = pods[i];
if (pod.status.phase !== 'Running' || !pod.status.podIP) {
pods.splice(i, 1);
}
}
if (!pods.length) {
return finish('No pods are currently running, probably just give them some time.');
}
//Lets try and get the rs status for this mongo instance
//If it works with no errors, they are in the rs
//If we get a specific error, it means they aren't in the rs
mongo.replSetGetStatus(db, function(err, status) {
if (err) {
if (err.code && err.code == 94) {
notInReplicaSet(db, pods, function(err) {
finish(err, db);
});
}
else if (err.code && err.code == 93) {
invalidReplicaSet(db, pods, status, function(err) {
finish(err, db);
});
}
else {
finish(err, db);
}
return;
}
inReplicaSet(db, pods, status, function(err) {
finish(err, db);
});
});
});
};
var finish = function(err, db) {
if (err) {
console.error('Error in workloop', err);
}
if (db && db.close) {
db.close();
}
setTimeout(workloop, loopSleepSeconds * 1000);
};
var inReplicaSet = function(db, pods, status, done) {
//If we're already in a rs and we ARE the primary, do the work of the primary instance (i.e. adding others)
//If we're already in a rs and we ARE NOT the primary, just continue, nothing to do
//If we're already in a rs and NO ONE is a primary, elect someone to do the work for a primary
var members = status.members;
var primaryExists = false;
for (var i in members) {
var member = members[i];
if (member.state === 1) {
if (member.self) {
return primaryWork(db, pods, members, false, done);
}
primaryExists = true;
break;
}
}
if (!primaryExists && podElection(pods)) {
console.log('Pod has been elected as a secondary to do primary work');
return primaryWork(db, pods, members, true, done);
}
done();
};
var primaryWork = function(db, pods, members, shouldForce, done) {
//Loop over all the pods we have and see if any of them aren't in the current rs members array
//If they aren't in there, add them
var addrToAdd = addrToAddLoop(pods, members);
var addrToRemove = addrToRemoveLoop(members);
if (addrToAdd.length || addrToRemove.length) {
console.log('Addresses to add: ', addrToAdd);
console.log('Addresses to remove: ', addrToRemove);
mongo.addNewReplSetMembers(db, addrToAdd, addrToRemove, shouldForce, done);
return;
}
done();
};
var notInReplicaSet = function(db, pods, done) {
var createTestRequest = function(pod) {
return function(completed) {
mongo.isInReplSet(pod.status.podIP, completed);
};
};
//If we're not in a rs and others ARE in the rs, just continue, another path will ensure we will get added
//If we're not in a rs and no one else is in a rs, elect one to kick things off
var testRequests = [];
for (var i in pods) {
var pod = pods[i];
if (pod.status.phase === 'Running') {
testRequests.push(createTestRequest(pod));
}
}
async.parallel(testRequests, function(err, results) {
if (err) {
return done(err);
}
for (var i in results) {
if (results[i]) {
return done(); //There's one in a rs, nothing to do
}
}
if (podElection(pods)) {
console.log('Pod has been elected for replica set initialization');
var primary = pods[0]; // After the sort election, the 0-th pod should be the primary.
var primaryStableNetworkAddressAndPort = getPodStableNetworkAddressAndPort(primary);
// Prefer the stable network ID over the pod IP, if present.
var primaryAddressAndPort = primaryStableNetworkAddressAndPort || hostIpAndPort;
mongo.initReplSet(db, primaryAddressAndPort, done);
return;
}
done();
});
};
var invalidReplicaSet = function(db, pods, status, done) {
// The replica set config has become invalid, probably due to catastrophic errors like all nodes going down
// this will force re-initialize the replica set on this node. There is a small chance for data loss here
// because it is forcing a reconfigure, but chances are recovering from the invalid state is more important
var members = [];
if (status && status.members) {
members = status.members;
}
console.log("Invalid replica set");
if (!podElection(pods)) {
console.log("Didn't win the pod election, doing nothing");
return done();
}
console.log("Won the pod election, forcing re-initialization");
var addrToAdd = addrToAddLoop(pods, members);
var addrToRemove = addrToRemoveLoop(members);
mongo.addNewReplSetMembers(db, addrToAdd, addrToRemove, true, function(err) {
done(err);
});
};
var podElection = function(pods) {
//Because all the pods are going to be running this code independently, we need a way to consistently find the same
//node to kick things off, the easiest way to do that is convert their ips into longs and find the highest
pods.sort(function(a,b) {
var aIpVal = ip.toLong(a.status.podIP);
var bIpVal = ip.toLong(b.status.podIP);
if (aIpVal < bIpVal) return -1;
if (aIpVal > bIpVal) return 1;
return 0; //Shouldn't get here... all pods should have different ips
});
//Are we the lucky one?
return pods[0].status.podIP == hostIp;
};
var addrToAddLoop = function(pods, members) {
var addrToAdd = [];
for (var i in pods) {
var pod = pods[i];
if (pod.status.phase !== 'Running') {
continue;
}
var podIpAddr = getPodIpAddressAndPort(pod);
var podStableNetworkAddr = getPodStableNetworkAddressAndPort(pod);
var podInRs = false;
for (var j in members) {
var member = members[j];
if (member.name === podIpAddr || member.name === podStableNetworkAddr) {
/* If we have the pod's ip or the stable network address already in the config, no need to read it. Checks both the pod IP and the
* stable network ID - we don't want any duplicates - either one of the two is sufficient to consider the node present. */
podInRs = true;
break;
}
}
if (!podInRs) {
// If the node was not present, we prefer the stable network ID, if present.
var addrToUse = podStableNetworkAddr || podIpAddr;
addrToAdd.push(addrToUse);
}
}
return addrToAdd;
};
var addrToRemoveLoop = function(members) {
var addrToRemove = [];
for (var i in members) {
var member = members[i];
if (memberShouldBeRemoved(member)) {
addrToRemove.push(member.name);
}
}
return addrToRemove;
};
var memberShouldBeRemoved = function(member) {
return !member.health
&& moment().subtract(unhealthySeconds, 'seconds').isAfter(member.lastHeartbeatRecv);
};
/**
* @param pod this is the Kubernetes pod, containing the info.
* @returns string - podIp the pod's IP address with the port from config attached at the end. Example
* WWW.XXX.YYY.ZZZ:27017. It returns undefined, if the data is insufficient to retrieve the IP address.
*/
var getPodIpAddressAndPort = function(pod) {
if (!pod || !pod.status || !pod.status.podIP) {
return;
}
return pod.status.podIP + ":" + config.mongoPort;
};
/**
* Gets the pod's address. It can be either in the form of
* '<pod-name>.<mongo-kubernetes-service>.<pod-namespace>.svc.cluster.local:<mongo-port>'. See:
* <a href="https://kubernetes.io/docs/concepts/abstractions/controllers/statefulsets/#stable-network-id">Stateful Set documentation</a>
* for more details. If those are not set, then simply the pod's IP is returned.
* @param pod the Kubernetes pod, containing the information from the k8s client.
* @returns string the k8s MongoDB stable network address, or undefined.
*/
var getPodStableNetworkAddressAndPort = function(pod) {
if (!config.k8sMongoServiceName || !pod || !pod.metadata || !pod.metadata.name || !pod.metadata.namespace) {
return;
}
var clusterDomain = config.k8sClusterDomain;
var mongoPort = config.mongoPort;
return pod.metadata.name + "." + config.k8sMongoServiceName + "." + pod.metadata.namespace + ".svc." + clusterDomain + ":" + mongoPort;
};
module.exports = {
init: init,
workloop: workloop
};