Skip to content

Commit

Permalink
Add load balance and socket pool.
Browse files Browse the repository at this point in the history
  • Loading branch information
Corey600 committed Sep 15, 2017
1 parent af87ec9 commit eedf806
Show file tree
Hide file tree
Showing 11 changed files with 671 additions and 167 deletions.
88 changes: 20 additions & 68 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@

'use strict';

// require core modules
var url = require('url');
var querystring = require('querystring');

// require thirdpart modules
var zookeeper = require('node-zookeeper-client');

Expand All @@ -17,9 +13,9 @@ var Invoker = require('./lib/invoker');
/**
* Constructor of ZD.
*
* @param {Object} conf
* @param {String|Object} conf // A String of host:port pairs or an object to set options.
* {
* dubbo: String // dubbo version information
* dubbo: String // Dubbo version information.
*
* // The following content could reference:
* // https://github.com/alexguan/node-zookeeper-client#client-createclientconnectionstring-options
Expand All @@ -34,18 +30,13 @@ var Invoker = require('./lib/invoker');
*/
function ZD(conf) {
if (!(this instanceof ZD)) return new ZD(conf);
var config = conf || {};
this._dubbo = config.dubbo;
this._conn = config.conn;
this._client = zookeeper.createClient(this._conn, {
var config = ('string' === typeof conf) ? { conn: conf } : (conf || {});
this.dubbo = config.dubbo;
this.client = zookeeper.createClient(config.conn, {
sessionTimeout: config.sessionTimeout,
spinDelay: config.spinDelay,
retries: config.retries
});
this.client = this._client;

this._cache = {};
this.cache = this._cache;
}

/**
Expand All @@ -54,10 +45,10 @@ function ZD(conf) {
* @public
*/
ZD.prototype.connect = function () {
if (!this._client || !this._client.connect) {
if (!this.client || !this.client.connect) {
return;
}
this._client.connect();
this.client.connect();
};

/**
Expand All @@ -66,10 +57,10 @@ ZD.prototype.connect = function () {
* @public
*/
ZD.prototype.close = function () {
if (!this._client || !this._client.close) {
if (!this.client || !this.client.close) {
return;
}
this._client.close();
this.client.close();
};

/**
Expand All @@ -80,69 +71,30 @@ ZD.prototype.close = function () {
* {
* version: String
* timeout: Number
* poolMax: Number
* poolMin: Number
* }
* @returns {Invoker}
* @public
*/
ZD.prototype.getInvoker = function (path, opt) {
var self = this;
var option = opt || {};
return new Invoker(this, {
return new Invoker(self.client, {
path: path,
dubbo: self.dubbo,
timeout: option.timeout,
version: (option.version || '0.0.0').toUpperCase()
version: option.version,
poolMax: option.poolMax,
poolMin: option.poolMin
});
};

/**
* Get a provider from the registry.
*
* @param {String} path
* @param {String} version
* @param {Function} cb
* @returns {*}
* @public
* Expose `Invoker`. To create a Invoker with URIs directly.
* @type {Invoker}
*/
ZD.prototype.getProvider = function (path, version, cb) {
var self = this;
var _path = '/dubbo/' + path + '/providers';
return self._client.getChildren(_path, function (err, children) {
var child;
var parsed;
var provider;
var i;
var l;
if (err) {
return cb(err);
}

if (children && !children.length) {
return cb('Can\'t find children from the node: ' + _path +
' ,please check the path!');
}

try {
for (i = 0, l = children.length; i < l; i += 1) {
child = querystring.parse(decodeURIComponent(children[i]));
// console.log(child);
if (child.version === version) {
break;
}
}

parsed = url.parse(Object.keys(child)[0]);
provider = {
host: parsed.hostname,
port: parsed.port,
methods: child.methods.split(',')
};
self._cache[path] = provider;
} catch (e) {
return cb(e);
}

return cb(false, provider);
});
};
ZD.Invoker = Invoker;

/**
* Expose `ZD`.
Expand Down
34 changes: 25 additions & 9 deletions lib/codec.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,14 @@ function Codec() {
* http://dubbo.io/dubbo_protocol_header.jpg-version=1&modificationDate=1335251744000.jpg
*
* @param {Invoker} invoker
* @param {Object} provider
* @param {String} method
* @param {Array} args
* @returns {Buffer}
* @public
*/
Codec.prototype.encodeRequest = function (invoker, method, args) {
var body = this._buildBody(invoker, method, args);
Codec.prototype.encodeRequest = function (invoker, provider, method, args) {
var body = this._buildBody(invoker, provider, method, args);
var head = this._buildHead(body.length);
return Buffer.concat([head, body]);
};
Expand Down Expand Up @@ -176,7 +177,7 @@ Codec.prototype.decodeResponse = function (cb) {
* -----------------------------------------------------------------------------------------------
*
* @param {Number} length 长度
* @returns {Buffer}
* @returns {SafeBuffer}
* @private
*/
Codec.prototype._buildHead = function (length) {
Expand Down Expand Up @@ -206,17 +207,23 @@ Codec.prototype._buildHead = function (length) {
* 7. 将整个附属信息map对象attachments序列化
*
* @param {Invoker} invoker
* @param {Object} provider
* @param {String} method
* @param {Array} args
* @returns {Buffer}
* @private
*/
Codec.prototype._buildBody = function (invoker, method, args) {
Codec.prototype._buildBody = function (invoker, provider, method, args) {
var encoder = new hessian.EncoderV2();

encoder.write(invoker._zd._dubbo);
encoder.write(invoker._path);
encoder.write(invoker._version);
// todo check
var path = invoker._path;
var dubbo = invoker._dubbo || provider.dubbo;
var version = invoker._version || provider.version;
var timeout = (invoker._timeout || provider['default.timeout'] || '60000') + '';

encoder.write(dubbo);
encoder.write(path);
encoder.write(version);
encoder.write(method);

var index;
Expand All @@ -243,7 +250,16 @@ Codec.prototype._buildBody = function (invoker, method, args) {
}

// 将整个附属信息map对象attachments序列化
encoder.write(invoker._attchments);
var _attchments = {
$class: 'java.util.HashMap',
$: {
path: path,
interface: path,
timeout: timeout,
version: version
}
};
encoder.write(_attchments);

var byteBuffer = encoder.byteBuffer;
byteBuffer = byteBuffer.get(0, encoder.byteBuffer._offset);
Expand Down
Loading

0 comments on commit eedf806

Please sign in to comment.