Skip to content

Commit

Permalink
feat: integrated with sofa-rpc-node
Browse files Browse the repository at this point in the history
  • Loading branch information
gxcsoccer committed Aug 23, 2018
1 parent bb7dca7 commit e280fdb
Show file tree
Hide file tree
Showing 46 changed files with 3,221 additions and 483 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
@@ -1,8 +1,8 @@
sudo: false
language: node_js
node_js:
- '6'
- '7'
- '8'
- '10'
install:
- npm i npminstall && npminstall
script:
Expand Down
131 changes: 89 additions & 42 deletions README.md
Expand Up @@ -85,49 +85,96 @@ packet status not ok
$ npm install dubbo-remoting --save
```

## API

- `decoder(url)` get decoder of the connection with certain url
- @param {String} connection url
- @return {DubboDecoder}

```js
const net = require('net');
const protocol = require('dubbo-remoting');
const url = 'dubbo://127.0.0.0:12200/com.xxx.DemoService?_TIMEOUT=2000&_p=4&application=xx&default.service.filter=dragoon&dubbo=2.6.1&interface=com.xxx.DemoService&methods=sayHello&pid=25381&revision=2.6.1&side=provider&threads=300&timeout=2000&timestamp=1487081081346&v=2.0&version=1.0.0';
const decoder = protocol.decoder(url)

const socket = net.connect(12200, '127.0.0.1');
socket.pipe(decoder);

decoder.on('packet', p => {
console.log('packet', p);
## Usage

You can use this dubbo protocol implementation with the [sofa-rpc-node](https://github.com/alipay/sofa-rpc-node)

### 1. Install & Launch zk

```bash
$ brew install zookeeper

$ zkServer start
ZooKeeper JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
Starting zookeeper ... STARTED
```

### 2. Expose a dubbo service

```js
'use strict';

const { RpcServer } = require('sofa-rpc-node').server;
const { ZookeeperRegistry } = require('sofa-rpc-node').registry;
const protocol = require('dubbo-remoting');

const logger = console;

// 1. create zk registry client
const registry = new ZookeeperRegistry({
logger,
address: '127.0.0.1:2181',
});

// 2. create rpc server
const server = new RpcServer({
logger,
registry,
port: 12200,
protocol,
});

// 3. add service
server.addService({
interfaceName: 'com.nodejs.test.TestService',
}, {
async plus(a, b) {
return a + b;
},
});

// 4. launch the server
server.start()
.then(() => {
server.publish();
});
socket.on('connect', () => {
console.log('connected');
```

### 3. Call the dubbo service

```js
'use strict';

const { RpcClient } = require('sofa-rpc-node').client;
const { ZookeeperRegistry } = require('sofa-rpc-node').registry;
const protocol = require('dubbo-remoting');
const logger = console;

// 1. create zk registry client
const registry = new ZookeeperRegistry({
logger,
address: '127.0.0.1:2181',
});

async function invoke() {
// 2. create rpc client with dubbo protocol
const client = new RpcClient({
logger,
registry,
protocol,
});
socket.on('error', err => {
console.error('err', err);
// 3. create rpc service consumer
const consumer = client.createConsumer({
interfaceName: 'com.nodejs.test.TestService',
});
// 4. wait consumer ready
await consumer.ready();

const Request = protocol.Request;
const Invocation = protocol.Invocation;
const req = new Request();
req.data = new Invocation({
methodName: 'sayHello',
args: ['zongyu'],
attachments: {
path: 'com.xxx.DemoService',
interface: 'com.xxx.DemoService',
version: '1.0.0',
timeout: 2000,
},
});
socket.write(req.encode());
```

- `DubboDecoder` an writable stream, your can pipe socket to it
- `Request` the Dubbo request
- `Invocation` the abstraction of the Dubbo service invocation
- `Response` the Dubbo response
- `Result` the abstraction of the Dubbo service result
// 5. call the service
const result = await consumer.invoke('plus', [1, 2], { responseTimeout: 3000 });
console.log('1 + 2 = ' + result);
}

invoke().catch(console.error);
```
6 changes: 3 additions & 3 deletions appveyor.yml
@@ -1,7 +1,7 @@
environment:
matrix:
- nodejs_version: '6'
- nodejs_version: '7'
- nodejs_version: '8'
- nodejs_version: '10'

install:
- ps: Install-Product node $env:nodejs_version
Expand All @@ -10,6 +10,6 @@ install:
test_script:
- node --version
- npm --version
- npm run ci
- npm run test

build: off
33 changes: 33 additions & 0 deletions example/client.js
@@ -0,0 +1,33 @@
'use strict';

const { RpcClient } = require('sofa-rpc-node').client;
const { ZookeeperRegistry } = require('sofa-rpc-node').registry;
const protocol = require('..');
const logger = console;

// 1. 创建 zk 注册中心客户端
const registry = new ZookeeperRegistry({
logger,
address: '127.0.0.1:2181',
});

async function invoke() {
// 2. 创建 RPC Client 实例
const client = new RpcClient({
logger,
registry,
protocol,
});
// 3. 创建服务的 consumer
const consumer = client.createConsumer({
interfaceName: 'com.nodejs.test.TestService',
});
// 4. 等待 consumer ready(从注册中心订阅服务列表...)
await consumer.ready();

// 5. 执行泛化调用
const result = await consumer.invoke('plus', [1, 2], { responseTimeout: 3000 });
console.log('1 + 2 = ' + result);
}

invoke().catch(console.error);
36 changes: 36 additions & 0 deletions example/server.js
@@ -0,0 +1,36 @@
'use strict';

const { RpcServer } = require('sofa-rpc-node').server;
const { ZookeeperRegistry } = require('sofa-rpc-node').registry;
const protocol = require('..');

const logger = console;

// 1. 创建 zk 注册中心客户端
const registry = new ZookeeperRegistry({
logger,
address: '127.0.0.1:2181', // 需要本地启动一个 zkServer
});

// 2. 创建 RPC Server 实例
const server = new RpcServer({
logger,
registry, // 传入注册中心客户端
port: 12200,
protocol,
});

// 3. 添加服务
server.addService({
interfaceName: 'com.nodejs.test.TestService',
}, {
async plus(a, b) {
return a + b;
},
});

// 4. 启动 Server 并发布服务
server.start()
.then(() => {
server.publish();
});
23 changes: 0 additions & 23 deletions index.js

This file was deleted.

68 changes: 31 additions & 37 deletions lib/decoder.js
@@ -1,67 +1,61 @@
'use strict';

const is = require('is-type-of');
const assert = require('assert');
const utils = require('./utils');
const urlparse = require('url').parse;
const protocol = require('./protocol');
const Writable = require('stream').Writable;

// 协议实现
const protocolMap = {
dubbo: require('./protocol/dubbo'),
exchange: require('./protocol/dubbo'),
};
const HEADER_LENGTH = 16;

class DubboDecoder extends Writable {
constructor(options) {
assert(options && is.string(options.url), '[dubbo-remoting] options.url is required');
constructor(options = {}) {
super(options);
this._buf = null;
this._url = urlparse(options.url, true);
const proto = this._url.protocol.replace(/:?$/, ''); // trim tail ":"
this._protocol = protocolMap[proto];
assert(this._protocol, `[dubbo-remoting] unsupport protocol => ${proto}`);
}

/**
* 根据 url 返回匹配的协议
*
* @property {Object} DubboDecoder#protocol
*/
get protocol() {
return this._protocol;
this.options = options;
}

_write(chunk, encoding, callback) {
// merge old & new bytes
this._buf = this._buf ? utils.concatBuffer(this._buf, chunk) : chunk;
// 合并 buf 中的数据
this._buf = this._buf ? Buffer.concat([ this._buf, chunk ]) : chunk;
try {
let unfinish = false;
do {
unfinish = this._decode();
} while (unfinish);
callback();
} catch (err) {
err.name = 'DubboDecodeError';
err.data = this._buf ? this._buf.toString('base64') : '';
callback(err);
}
}

_decode() {
const ret = this.protocol.decode(this._buf);
if (ret) {
// 这里异步化是为了避免 listeners 业务报错影响到 decoder
process.nextTick(() => { this.emit('packet', ret.packet); });
const bufLength = this._buf.length;

const bufSize = this._buf.length;
const rest = bufSize - ret.total;
if (rest > 0) {
this._buf = this._buf.slice(ret.total);
return true;
}
this._buf = null;
if (bufLength < HEADER_LENGTH) {
return false;
}
const bodyLen = this._buf.readInt32BE(12);
const packetLength = bodyLen + HEADER_LENGTH;
if (packetLength === 0 || bufLength < packetLength) {
return false;
}
const packet = this._buf.slice(0, packetLength);
// 调用反序列化方法获取对象
const obj = protocol.decode(packet, this.options);
this.emit(obj.packetType, obj);
const restLen = bufLength - packetLength;
if (restLen) {
this._buf = this._buf.slice(packetLength);
return true;
}
this._buf = null;
return false;
}

_destroy() {
this._buf = null;
this.emit('close');
}
}

module.exports = DubboDecoder;

0 comments on commit e280fdb

Please sign in to comment.