-
Notifications
You must be signed in to change notification settings - Fork 7
/
cluster.js
118 lines (104 loc) · 3.4 KB
/
cluster.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
var events = require('events'),
util = require('util'),
redis = require('redis'),
redisd = require('./redisd');
var REDIS = 'localhost:6379';
var Cluster = function(_port, _address) {
this._clients = {};
//Use JSON, tnetstrings or any serialization
this.serialize = JSON.stringify;
this.unserialize = JSON.parse;
this.worker = new events.EventEmitter();
this.queues = [];
this._id = 0;
var that = this;
this.server = redisd.createServer(function(command) {
if (command[0] == 'info') {
this.encode('redis_version:2.4.5');
} else {
that.emit('id:' + command[0] + ':' + command[1], command);
this.singleline('OK');
}
});
this.server.listen();
this.client(REDIS);//cold warm
};
util.inherits(Cluster, events.EventEmitter);
Cluster.prototype.self = function() {
var s = this.server.address();
return [s.address, s.port].join(':');
};
Cluster.prototype.unique_id = function() {
//TODO a job for UUID?
var s = this.server.address();
return [s.address, s.port, this._id++].join(':');
};
Cluster.prototype.next_id = function() {
return this._id++;
};
Cluster.prototype.client = function(key) {//lazy clients
if (this._clients[key] == undefined) {
console.log('new client to', key);
var kv = key.split(':');
this._clients[key] = redis.createClient(kv[1], kv[0]);
var that = this;
this._clients[key].on('error', function(error) {
console.warn(error);
that._clients[key].end();
delete that._clients[key];
//[FIXME] disconnecting client is a drama.
});
}
return this._clients[key];
};
Cluster.prototype.call = function(who, what, arg, callback) {
this.client(who).send_command(what, [this.serialize(arg)], callback);
};
Cluster.prototype.answer= function(who, what, job_id, arg, callback) {
this.client(who).send_command(what, ["" + job_id, this.serialize(arg)],
callback);
};
Cluster.prototype.work = function(queue, action, args, respond_to, callback) {
var job_id = this.next_id();
this.client(REDIS).rpush(queue,
this.serialize([action, args, respond_to, job_id]), callback);
return job_id;
};
Cluster.prototype.asyncCall = function(who, what, arg, callback) {
//response node is provided, simple direct response should be OK
//or something like that.
this.redis.send_command(what, [this.serialize([arg, this.self()])],
function(err, resp) {
if (err) console.warn(err);
});
// register the callback with its id
// callback can be call more than one time
};
Cluster.prototype.work_loop = function() {
//[TODO] Raise error if no loop.
var w = this;
this.client(REDIS).blpop(this.queues, 0, function(err, resp) {
if (err) {
console.log(err);
}
if (resp) {
var args = JSON.parse(resp[1]);
var f = args.shift();
w.worker.emit(f, args[0], args[1], args[2]);
}
if (resp) {
process.nextTick(function() {
w.work_loop();
});
} else {
//the queue is empty, don't flood redis
setTimeout(function() {
w.work_loop();
}, 1000);
}
});
};
exports.createCluster = function() {
var cluster = new Cluster();
return cluster;
};