Skip to content

Commit

Permalink
feature: support rich balance context
Browse files Browse the repository at this point in the history
  • Loading branch information
hefangshi committed Sep 19, 2016
1 parent 298a5ab commit fad2712
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 26 deletions.
6 changes: 3 additions & 3 deletions lib/ext/balance/hashring.js
Expand Up @@ -65,8 +65,8 @@ HashringBalance.prototype.generateHash = function (hashKey) {
return murmurhash(hashKey.toString(), 20130710);
};

HashringBalance.prototype.fetchServer = function (balanceContext) {
if (balanceContext.balanceKey === undefined || balanceContext.balanceKey === null) {
HashringBalance.prototype.fetchServer = function (balanceContext, conf, prevBackend) {
if (conf.balanceKey === undefined || conf.balanceKey === null) {
throw new Error('balanceKey is needed when using consistent hashing');
}
var servers = balanceContext.reqIDCServers;
Expand All @@ -78,7 +78,7 @@ HashringBalance.prototype.fetchServer = function (balanceContext) {
}
var ringIndex = bs(balanceContext.hashring, {
index: null,
value: this.generateHash(balanceContext.balanceKey)
value: this.generateHash(conf.balanceKey)
}, function (a, b) {
return a.value - b.value;
});
Expand Down
10 changes: 5 additions & 5 deletions lib/ral.js
Expand Up @@ -39,11 +39,11 @@ function RalRunner(serviceName, options) {
this._retryTimes = 0;
this._requestID = Math.ceil(now() * 100000);
this.serviceName = serviceName;
this.chosenBackend = [];

logger.trace('request start requestID=' + this._requestID);
EventEmitter.call(this);
options = options || {};
var payload;
var timer = this.timer = new Timer(['request', 'talk', 'pack', 'write', 'read', 'unpack']);
timer.start('request');
var conf = this.conf = config.getConf(serviceName);
Expand Down Expand Up @@ -82,7 +82,7 @@ function RalRunner(serviceName, options) {
timer.start('request');
timer.start('talk');
logger.trace('start retry request.');
me.doRequest(payload);
me.doRequest();
}
});

Expand All @@ -104,7 +104,7 @@ RalRunner.prototype.doRequest = function () {
* unpack stream
*/
var unpack;
/**
/**
* request body
*/
var payload;
Expand Down Expand Up @@ -281,9 +281,9 @@ RalRunner.prototype.doRequest = function () {
}

// choose a real server
context.balanceContext.balanceKey = conf.balanceKey;
try {
conf.server = context.balance.fetchServer(context.balanceContext);
conf.server = context.balance.fetchServer(context.balanceContext, conf, this.chosenBackend);
this.chosenBackend.push(conf.server);
}
catch (err) {
callReqError(err);
Expand Down
39 changes: 24 additions & 15 deletions test/balance.js
Expand Up @@ -193,15 +193,17 @@ describe('hashring balance', function () {
var balance = new HashringBalance();
var resultStore = {};
for (var i = 0; i < 100; i++) {
context.balanceKey = i;
var server = balance.fetchServer(context);
var server = balance.fetchServer(context, {
balanceKey: i
});
resultStore[server.index] = resultStore[server.index] || [];
resultStore[server.index].push(i);
}
// 哈希应该是稳定的
for (i = 0; i < 100; i++) {
context.balanceKey = i;
server = balance.fetchServer(context);
server = balance.fetchServer(context, {
balanceKey: i
});
resultStore[server.index].should.containEql(i);
}
});
Expand All @@ -212,8 +214,9 @@ describe('hashring balance', function () {
var balance = new HashringBalance();
var resultStore = {};
for (var i = 0; i < 100; i++) {
context.balanceKey = i;
var server = balance.fetchServer(context);
var server = balance.fetchServer(context, {
balanceKey: i
});
resultStore[server.index] = resultStore[server.index] || [];
resultStore[server.index].push(i);
}
Expand All @@ -222,8 +225,9 @@ describe('hashring balance', function () {
var lastServer = context.reqIDCServers.pop();
context.hashring = null;
for (i = 0; i < 100; i++) {
context.balanceKey = i;
server = balance.fetchServer(context);
server = balance.fetchServer(context, {
balanceKey: i
});
if (resultStore[server.index].indexOf(i) === -1) {
resultStore[9].should.containEql(i);
}
Expand All @@ -237,8 +241,9 @@ describe('hashring balance', function () {
var balance = new HashringBalance();
var resultStore = {};
for (var i = 0; i < 100; i++) {
context.balanceKey = i;
var server = balance.fetchServer(context);
var server = balance.fetchServer(context, {
balanceKey: i
});
resultStore[server.index] = resultStore[server.index] || [];
resultStore[server.index].push(i);
}
Expand All @@ -250,8 +255,9 @@ describe('hashring balance', function () {
});
context.hashring = null;
for (i = 0; i < 100; i++) {
context.balanceKey = i;
server = balance.fetchServer(context);
server = balance.fetchServer(context, {
balanceKey: i
});
if (server.index !== '10') {
resultStore[server.index].should.containEql(i);
}
Expand All @@ -267,10 +273,13 @@ describe('hashring balance', function () {
throw new Error();
};
(function () {
context.balanceKey = 1;
balance.fetchServer(context);
balance.fetchServer(context, {
balanceKey: 1
});
}).should.not.throwError();
var server = balance.fetchServer(context);
var server = balance.fetchServer(context, {
balanceKey: 1
});
Math.random = SourceRandom;
server.should.be.eql(conf.bookService.server[0]);
});
Expand Down
7 changes: 4 additions & 3 deletions test/ral.js
Expand Up @@ -350,7 +350,7 @@ describe('ral', function () {
msg: 'hi',
name: '何方石'
},
retry: 2,
retry: 0,
timeout: 100
});
req.on('error', function (err) {
Expand Down Expand Up @@ -537,7 +537,7 @@ describe('ral', function () {
},
pack: 'stream',
unpack: 'stream',
retry: 2,
retry: 0,
timeout: 100
}).on('error', function (err) {
err.message.should.be.match(/invalid pack data/);
Expand Down Expand Up @@ -739,7 +739,8 @@ describe('ral', function () {
before(function (ok) {
isInited.on('done', ok);
});
ral('hashService', {}).on('data', function (data) {
ral('hashService', {
}).on('data', function (data) {
data.should.not.be.ok;
done();
}).on('error', function (err) {
Expand Down

0 comments on commit fad2712

Please sign in to comment.