Permalink
Browse files

feat: integrated with sofa-rpc-node

  • Loading branch information...
gxcsoccer committed Aug 23, 2018
1 parent bb7dca7 commit e280fdbb483a53edeed25302ab8adbad569e71d9
Showing with 3,221 additions and 483 deletions.
  1. +2 −2 .travis.yml
  2. +89 −42 README.md
  3. +3 −3 appveyor.yml
  4. +33 −0 example/client.js
  5. +36 −0 example/server.js
  6. +0 −23 index.js
  7. +31 −37 lib/decoder.js
  8. +187 −0 lib/encoder.js
  9. +18 −0 lib/index.js
  10. +0 −41 lib/protocol/dubbo/index.js
  11. +0 −10 lib/protocol/dubbo/result.js
  12. +73 −0 lib/protocol/index.js
  13. 0 lib/protocol/{dubbo → }/invocation.js
  14. +45 −27 lib/protocol/{dubbo → }/request.js
  15. +54 −38 lib/protocol/{dubbo → }/response.js
  16. +192 −0 lib/serialize/hessian/compile.js
  17. +7 −2 lib/serialize/{hessian.js → hessian/index.js}
  18. +6 −0 lib/serialize/hessian/primitive_type/boolean.js
  19. +135 −0 lib/serialize/hessian/primitive_type/custom_map.js
  20. +6 −0 lib/serialize/hessian/primitive_type/double.js
  21. +6 −0 lib/serialize/hessian/primitive_type/int.js
  22. +6 −0 lib/serialize/hessian/primitive_type/java.lang.boolean.js
  23. +29 −0 lib/serialize/hessian/primitive_type/java.lang.class.js
  24. +6 −0 lib/serialize/hessian/primitive_type/java.lang.double.js
  25. +73 −0 lib/serialize/hessian/primitive_type/java.lang.exception.js
  26. +6 −0 lib/serialize/hessian/primitive_type/java.lang.integer.js
  27. +6 −0 lib/serialize/hessian/primitive_type/java.lang.long.js
  28. +11 −0 lib/serialize/hessian/primitive_type/java.lang.object.js
  29. +50 −0 lib/serialize/hessian/primitive_type/java.lang.stacktraceelement.js
  30. +7 −0 lib/serialize/hessian/primitive_type/java.lang.string.js
  31. +25 −0 lib/serialize/hessian/primitive_type/java.math.bigdecimal.js
  32. +28 −0 lib/serialize/hessian/primitive_type/java.util.arraylist.js
  33. +25 −0 lib/serialize/hessian/primitive_type/java.util.currency.js
  34. +7 −0 lib/serialize/hessian/primitive_type/java.util.date.js
  35. +43 −0 lib/serialize/hessian/primitive_type/java.util.list.js
  36. +25 −0 lib/serialize/hessian/primitive_type/java.util.locale.js
  37. +72 −0 lib/serialize/hessian/primitive_type/java.util.map.js
  38. +6 −0 lib/serialize/hessian/primitive_type/long.js
  39. +50 −0 lib/serialize/hessian/utils.js
  40. +7 −7 lib/utils.js
  41. +24 −16 package.json
  42. +263 −0 test/fixtures/class_map.js
  43. +1,139 −0 test/hessian.test.js
  44. +223 −234 test/index.test.js
  45. +166 −0 test/protocol.test.js
  46. +1 −1 test/utils.test.js
@@ -1,8 +1,8 @@
sudo: false
language: node_js
node_js:
- '6'
- '7'
- '8'
- '10'
install:
- npm i npminstall && npminstall
script:
131 README.md
@@ -85,49 +85,96 @@ packet status not ok
$ npm install dubbo-remoting --save
```
## API
- `decoder(url)` get decoder of the connection with certain url
- @param {String} connection url
- @return {DubboDecoder}
```js
const net = require('net');
const protocol = require('dubbo-remoting');
const url = 'dubbo://127.0.0.0:12200/com.xxx.DemoService?_TIMEOUT=2000&_p=4&application=xx&default.service.filter=dragoon&dubbo=2.6.1&interface=com.xxx.DemoService&methods=sayHello&pid=25381&revision=2.6.1&side=provider&threads=300&timeout=2000&timestamp=1487081081346&v=2.0&version=1.0.0';
const decoder = protocol.decoder(url)
const socket = net.connect(12200, '127.0.0.1');
socket.pipe(decoder);
decoder.on('packet', p => {
console.log('packet', p);
## Usage
You can use this dubbo protocol implementation with the [sofa-rpc-node](https://github.com/alipay/sofa-rpc-node)
### 1. Install & Launch zk
```bash
$ brew install zookeeper
$ zkServer start
ZooKeeper JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
Starting zookeeper ... STARTED
```
### 2. Expose a dubbo service
```js
'use strict';
const { RpcServer } = require('sofa-rpc-node').server;
const { ZookeeperRegistry } = require('sofa-rpc-node').registry;
const protocol = require('dubbo-remoting');
const logger = console;
// 1. create zk registry client
const registry = new ZookeeperRegistry({
logger,
address: '127.0.0.1:2181',
});
// 2. create rpc server
const server = new RpcServer({
logger,
registry,
port: 12200,
protocol,
});
// 3. add service
server.addService({
interfaceName: 'com.nodejs.test.TestService',
}, {
async plus(a, b) {
return a + b;
},
});
// 4. launch the server
server.start()
.then(() => {
server.publish();
});
socket.on('connect', () => {
console.log('connected');
```
### 3. Call the dubbo service
```js
'use strict';
const { RpcClient } = require('sofa-rpc-node').client;
const { ZookeeperRegistry } = require('sofa-rpc-node').registry;
const protocol = require('dubbo-remoting');
const logger = console;
// 1. create zk registry client
const registry = new ZookeeperRegistry({
logger,
address: '127.0.0.1:2181',
});
async function invoke() {
// 2. create rpc client with dubbo protocol
const client = new RpcClient({
logger,
registry,
protocol,
});
socket.on('error', err => {
console.error('err', err);
// 3. create rpc service consumer
const consumer = client.createConsumer({
interfaceName: 'com.nodejs.test.TestService',
});
// 4. wait consumer ready
await consumer.ready();
const Request = protocol.Request;
const Invocation = protocol.Invocation;
const req = new Request();
req.data = new Invocation({
methodName: 'sayHello',
args: ['zongyu'],
attachments: {
path: 'com.xxx.DemoService',
interface: 'com.xxx.DemoService',
version: '1.0.0',
timeout: 2000,
},
});
socket.write(req.encode());
```
- `DubboDecoder` an writable stream, your can pipe socket to it
- `Request` the Dubbo request
- `Invocation` the abstraction of the Dubbo service invocation
- `Response` the Dubbo response
- `Result` the abstraction of the Dubbo service result
// 5. call the service
const result = await consumer.invoke('plus', [1, 2], { responseTimeout: 3000 });
console.log('1 + 2 = ' + result);
}
invoke().catch(console.error);
```
@@ -1,7 +1,7 @@
environment:
matrix:
- nodejs_version: '6'
- nodejs_version: '7'
- nodejs_version: '8'
- nodejs_version: '10'
install:
- ps: Install-Product node $env:nodejs_version
@@ -10,6 +10,6 @@ install:
test_script:
- node --version
- npm --version
- npm run ci
- npm run test
build: off
@@ -0,0 +1,33 @@
'use strict';
const { RpcClient } = require('sofa-rpc-node').client;
const { ZookeeperRegistry } = require('sofa-rpc-node').registry;
const protocol = require('..');
const logger = console;
// 1. 创建 zk 注册中心客户端
const registry = new ZookeeperRegistry({
logger,
address: '127.0.0.1:2181',
});
async function invoke() {
// 2. 创建 RPC Client 实例
const client = new RpcClient({
logger,
registry,
protocol,
});
// 3. 创建服务的 consumer
const consumer = client.createConsumer({
interfaceName: 'com.nodejs.test.TestService',
});
// 4. 等待 consumer ready(从注册中心订阅服务列表...)
await consumer.ready();
// 5. 执行泛化调用
const result = await consumer.invoke('plus', [1, 2], { responseTimeout: 3000 });
console.log('1 + 2 = ' + result);
}
invoke().catch(console.error);
@@ -0,0 +1,36 @@
'use strict';
const { RpcServer } = require('sofa-rpc-node').server;
const { ZookeeperRegistry } = require('sofa-rpc-node').registry;
const protocol = require('..');
const logger = console;
// 1. 创建 zk 注册中心客户端
const registry = new ZookeeperRegistry({
logger,
address: '127.0.0.1:2181', // 需要本地启动一个 zkServer
});
// 2. 创建 RPC Server 实例
const server = new RpcServer({
logger,
registry, // 传入注册中心客户端
port: 12200,
protocol,
});
// 3. 添加服务
server.addService({
interfaceName: 'com.nodejs.test.TestService',
}, {
async plus(a, b) {
return a + b;
},
});
// 4. 启动 Server 并发布服务
server.start()
.then(() => {
server.publish();
});

This file was deleted.

Oops, something went wrong.
@@ -1,67 +1,61 @@
'use strict';
const is = require('is-type-of');
const assert = require('assert');
const utils = require('./utils');
const urlparse = require('url').parse;
const protocol = require('./protocol');
const Writable = require('stream').Writable;
// 协议实现
const protocolMap = {
dubbo: require('./protocol/dubbo'),
exchange: require('./protocol/dubbo'),
};
const HEADER_LENGTH = 16;
class DubboDecoder extends Writable {
constructor(options) {
assert(options && is.string(options.url), '[dubbo-remoting] options.url is required');
constructor(options = {}) {
super(options);
this._buf = null;
this._url = urlparse(options.url, true);
const proto = this._url.protocol.replace(/:?$/, ''); // trim tail ":"
this._protocol = protocolMap[proto];
assert(this._protocol, `[dubbo-remoting] unsupport protocol => ${proto}`);
}
/**
* 根据 url 返回匹配的协议
*
* @property {Object} DubboDecoder#protocol
*/
get protocol() {
return this._protocol;
this.options = options;
}
_write(chunk, encoding, callback) {
// merge old & new bytes
this._buf = this._buf ? utils.concatBuffer(this._buf, chunk) : chunk;
// 合并 buf 中的数据
this._buf = this._buf ? Buffer.concat([ this._buf, chunk ]) : chunk;
try {
let unfinish = false;
do {
unfinish = this._decode();
} while (unfinish);
callback();
} catch (err) {
err.name = 'DubboDecodeError';
err.data = this._buf ? this._buf.toString('base64') : '';
callback(err);
}
}
_decode() {
const ret = this.protocol.decode(this._buf);
if (ret) {
// 这里异步化是为了避免 listeners 业务报错影响到 decoder
process.nextTick(() => { this.emit('packet', ret.packet); });
const bufLength = this._buf.length;
const bufSize = this._buf.length;
const rest = bufSize - ret.total;
if (rest > 0) {
this._buf = this._buf.slice(ret.total);
return true;
}
this._buf = null;
if (bufLength < HEADER_LENGTH) {
return false;
}
const bodyLen = this._buf.readInt32BE(12);
const packetLength = bodyLen + HEADER_LENGTH;
if (packetLength === 0 || bufLength < packetLength) {
return false;
}
const packet = this._buf.slice(0, packetLength);
// 调用反序列化方法获取对象
const obj = protocol.decode(packet, this.options);
this.emit(obj.packetType, obj);
const restLen = bufLength - packetLength;
if (restLen) {
this._buf = this._buf.slice(packetLength);
return true;
}
this._buf = null;
return false;
}
_destroy() {
this._buf = null;
this.emit('close');
}
}
module.exports = DubboDecoder;
Oops, something went wrong.

0 comments on commit e280fdb

Please sign in to comment.