Skip to content
Permalink
Browse files
add runtime trace subscribe and improve binary => byte
  • Loading branch information
hufeng committed Jun 26, 2018
1 parent d5877fb commit baa708b5f407da9c25c7bf978a0fbd1b2000e124
Showing 18 changed files with 173 additions and 151 deletions.
@@ -7,6 +7,16 @@ const dubbo = (module.exports = new Dubbo({
service,
}));

// dubbo.ready().then(() => {
// console.log('dubbo was ready');
// });

dubbo.subscribe({
onTrace: msg => {
console.log(msg);
},
});

//cost middleware
/*dubbo.use(async function costTime(ctx, next) {
console.log('before dubbo cost middleware');
@@ -0,0 +1,17 @@
import {fromBytes4, fromBytes8, toBytes4, toBytes8} from '../byte';

it('test byte4', () => {
const MAX = 4294967295; //2 ** 32 - 1
for (let i = MAX; i > MAX - 100; i--) {
const byte = toBytes4(i);
expect(fromBytes4(byte)).toEqual(i);
}
});

it('test byte8', () => {
const MAX = Number.MAX_SAFE_INTEGER - 1;
for (let i = MAX; i > MAX - 100; i--) {
const byteBuf = toBytes8(i);
expect(fromBytes8(byteBuf)).toEqual(i);
}
});

This file was deleted.

@@ -15,15 +15,30 @@
* limitations under the License.
*/

import {EventEmitter} from 'events';
// Merge pull request #27 from hsiaosiyuan0/master
// 3ks hsiaosiyuan0

//use const enum instead of enum
//less runtime object
//inline value
export const enum MSG_TYPE {
SYS_ERR = 'SYS:ERR',
SYS_READY = 'SYS_READY',
SYS_STATISTICS = 'SYS_STATISTICS',
}
export const toBytes4 = (num: number) => {
const buf = Buffer.allocUnsafe(4);
buf.writeUInt32BE(num, 0);
return buf;
};

export const msg = new EventEmitter();
export const fromBytes4 = (buf: Buffer) => {
return buf.readUInt32BE(0);
};

export const toBytes8 = (num: number) => {
const buf = Buffer.allocUnsafe(8);
const high = Math.floor(num / Math.pow(2, 32));
const low = (num & 0xffffffff) >>> 0;
buf.writeUInt32BE(high, 0);
buf.writeUInt32BE(low, 4);
return buf;
};

export const fromBytes8 = (buf: Buffer) => {
const high = buf.readUInt32BE(0);
const low = buf.readUInt32BE(4);
return high * Math.pow(2, 32) + low;
};
@@ -16,7 +16,7 @@
*/

import debug from 'debug';
import {convertBinaryNum} from './binary';
import {fromBytes4} from './byte';
import HeartBeat from './heartbeat';
import {IObservable, TDecodeBuffSubscriber} from './types';
import {noop} from './util';
@@ -102,14 +102,9 @@ export default class DecodeBuffer

//取出头部字节
const header = this._buffer.slice(0, HEADER_LENGTH);
//计算body的长度
const bodyLengthBuff = Buffer.from([
header[12],
header[13],
header[14],
header[15],
]);
const bodyLength = convertBinaryNum(bodyLengthBuff, 4);
//计算body的长度字节位置[12-15]
const bodyLengthBuff = this._buffer.slice(12, 16);
const bodyLength = fromBytes4(bodyLengthBuff);
log('body length', bodyLength);

//判断是不是心跳
@@ -17,7 +17,7 @@

import debug from 'debug';
import Hessian from 'hessian.js';
import {convertBinaryNum} from './binary';
import {fromBytes8} from './byte';
import {DubboDecodeError} from './err';
import {IDubboResponse} from './types';

@@ -51,17 +51,9 @@ export function decode<T>(bytes: Buffer): IDubboResponse<T> {
let err = null;

// set request and serialization flag.
const requestIdBuff = Buffer.alloc(8);
requestIdBuff[0] = bytes[4];
requestIdBuff[1] = bytes[5];
requestIdBuff[2] = bytes[6];
requestIdBuff[3] = bytes[7];
requestIdBuff[4] = bytes[8];
requestIdBuff[5] = bytes[9];
requestIdBuff[6] = bytes[10];
requestIdBuff[7] = bytes[11];

const requestId = convertBinaryNum(requestIdBuff, 8);
//字节位置[4-11] 8 bytes
const requestIdBuff = bytes.slice(4, 12);
const requestId = fromBytes8(requestIdBuff);
log(`decode parse requestId: ${requestId}`);

// const typeId = bytes[2];
@@ -19,7 +19,6 @@ import debug from 'debug';
import compose from 'koa-compose';
import config from './config';
import Context from './context';
import {msg, MSG_TYPE} from './msg';
import queue from './queue';
import Scheduler from './scheduler';
import {to} from './to';
@@ -28,17 +27,20 @@ import {
IDubboProvider,
IDubboSubscriber,
IObservable,
ITrace,
Middleware,
TDubboService,
} from './types';
import {noop} from './util';
import {msg, noop, traceErr} from './util';
const version = require('../package.json').version;

const log = debug('dubbo:bootstrap');
log('dubbo2.js version :=> %s', require('../package.json').version);
log('dubbo2.js version :=> %s', version);

//定位没有处理的promise
process.on('unhandledRejection', (reason, p) => {
log(reason, p);
traceErr(new Error(reason));
});

/**
@@ -79,9 +81,7 @@ export default class Dubbo<TService = Object>

this._readyResolve = noop;
this._subscriber = {
onReady: noop,
onSysError: noop,
onStatistics: noop,
onTrace: noop,
};
//初始化消息监听
this._initMsgListener();
@@ -162,6 +162,7 @@ export default class Dubbo<TService = Object>
await fn(ctx);
} catch (err) {
log(err);
traceErr(err);
}

return ctx.body;
@@ -218,17 +219,17 @@ export default class Dubbo<TService = Object>
//================private method================
private _initMsgListener() {
process.nextTick(() => {
this._subscriber.onTrace({
type: 'INFO',
msg: `dubbo:bootstrap version => ${version}`,
});
msg
.addListener(MSG_TYPE.SYS_READY, () => {
this._readyResolve();
this._subscriber.onReady();
})
.addListener(MSG_TYPE.SYS_ERR, err => {
this._subscriber.onSysError(err);
.addListener('sys:trace', (msg: ITrace) => {
this._subscriber.onTrace(msg);
})
.addListener(MSG_TYPE.SYS_STATISTICS, stat =>
this._subscriber.onStatistics(stat),
);
.addListener('sys:ready', () => {
this._readyResolve();
});
});
}

@@ -17,7 +17,7 @@

import debug from 'debug';
import Hessian from 'hessian.js';
import {binaryNum} from './binary';
import {toBytes8} from './byte';
import Context from './context';
import {DubboEncodeError} from './err';

@@ -93,19 +93,15 @@ export default class DubboEncoder {
}

//body长度int-> 4个byte
const bodyLengthBuff = binaryNum(payload, 4);
header[12] = bodyLengthBuff[0];
header[13] = bodyLengthBuff[1];
header[14] = bodyLengthBuff[2];
header[15] = bodyLengthBuff[3];

header.writeUInt32BE(payload, 12);
return header;
}

private setRequestId(header) {
const {requestId} = this._ctx;
log(`encode header requestId: ${requestId}`);
const buffer = binaryNum(requestId, 8);
const buffer = toBytes8(requestId);
header[4] = buffer[0];
header[5] = buffer[1];
header[6] = buffer[2];
@@ -20,11 +20,10 @@ import config from './config';
import Context from './context';
import DubboUrl from './dubbo-url';
import {DubboMethodParamHessianTypeError, DubboTimeoutError} from './err';
import {msg, MSG_TYPE} from './msg';
import SocketWorker from './socket-worker';
import statistics from './statistics';
import {IObservable, TQueueObserver, TRequestId} from './types';
import {isDevEnv, noop} from './util';
import {isDevEnv, noop, traceErr} from './util';

const log = debug('dubbo:queue');

@@ -139,10 +138,11 @@ export class Queue implements IObservable<TQueueObserver> {
}

const {
uuid,
request: {dubboInterface, methodName},
} = ctx;
log('queue schedule failed requestId#%d, err: %s', requestId, err);
err.message = `invoke ${dubboInterface}#${methodName} was error, ${
err.message = `uuid: ${uuid} invoke ${dubboInterface}#${methodName} was error, ${
err.message
}`;
//删除该属性,不然会导致JSON.Stringify失败
@@ -152,6 +152,7 @@ export class Queue implements IObservable<TQueueObserver> {
ctx.cleanTimeout();
ctx.reject(err);
this._clear(requestId);
traceErr(err);
}

consume(requestId: TRequestId, node: SocketWorker, providerMeta: DubboUrl) {
@@ -167,7 +168,11 @@ export class Queue implements IObservable<TQueueObserver> {
request.dubboVersion = providerMeta.dubboVersion;
request.group = request.group || providerMeta.group;
request.path = providerMeta.path;
node.write(ctx);
try {
node.write(ctx);
} catch (err) {
this.failed(requestId, err);
}
if (isDevEnv) {
log(`current schedule queue ==>`, this.scheduleQueue);
}
@@ -188,8 +193,6 @@ export class Queue implements IObservable<TQueueObserver> {
//调度完成,显示调度结果
if (this._requestQueue.size === 0) {
log('invoke statistics==>%o', statistics);
//通知外部
msg.emit(MSG_TYPE.SYS_STATISTICS, statistics);
}
}

@@ -19,6 +19,7 @@ let uniqueId = 0;

export function id() {
uniqueId = ++uniqueId;
//reset
if (uniqueId === Number.MAX_SAFE_INTEGER) {
uniqueId = 0;
}

0 comments on commit baa708b

Please sign in to comment.