Skip to content
Permalink
Browse files
细化scheduler错误处理,server-agent=> dubbo-agent增加运行期间追踪能力,fixed zookeeper …
…自动重连不需要判等 bug
  • Loading branch information
hufeng committed Jul 18, 2018
1 parent a51984c commit 983547158fbf491bd7c75b4f3094e59b51c94b7a
Showing 4 changed files with 46 additions and 19 deletions.
@@ -27,7 +27,7 @@ const log = debug('dubbo:server-agent');
* 机器agent和socket-worker的管理容器
* Agent可以理解为一台dubbo service的负载
*/
export class ServerAgent implements IObservable<ISocketSubscriber> {
export class DubboAgent implements IObservable<ISocketSubscriber> {
constructor() {
this._serverAgentMap = new Map();
this._subscriber = {
@@ -47,7 +47,6 @@ export class ServerAgent implements IObservable<ISocketSubscriber> {
from = (agentAddrs: Set<string>) => {
log('create server-agent :|> %O', agentAddrs);
//获取负载host:port列表
//根据负载创建连接池
process.nextTick(() => {
for (let agentAddr of agentAddrs) {
//如果负载中存在该负载,继续下一个
@@ -134,14 +133,30 @@ export class ServerAgent implements IObservable<ISocketSubscriber> {
): Array<SocketWorker> {
let availableList = [];
for (let agentAddr of agentAddrList) {
const socketWorker = this._serverAgentMap.get(agentAddr);
if (socketWorker && socketWorker.isAvaliable) {
availableList.push(socketWorker);
if (this._serverAgentMap.has(agentAddr)) {
const socketWorker = this._serverAgentMap.get(agentAddr);
if (socketWorker.isAvaliable) {
availableList.push(socketWorker);
} else {
traceErr(
new Error(
`${agentAddrList.join()}:|>${agentAddr} current status is ${
socketWorker.status
}`,
),
);
}
} else {
traceErr(
new Error(
`${agentAddrList.join()}:|>${agentAddr} not match socket-worker`,
),
);
}
}
return availableList;
}
}

//expose sington
export default new ServerAgent();
export default new DubboAgent();
@@ -16,9 +16,9 @@
*/

import debug from 'debug';
import dubboAgent, {DubboAgent} from './dubbo-agent';
import {ScheduleError, SocketError, ZookeeperTimeoutError} from './err';
import queue from './queue';
import serverAgent, {ServerAgent} from './server-agent';
import {IZkClientProps} from './types';
import {traceErr, traceInfo} from './util';
import {ZkRegistry} from './zookeeper';
@@ -28,6 +28,7 @@ const enum STATUS {
PADDING = 'padding',
READY = 'ready',
FAILED = 'failded',
NO_AGENT = 'no_agent',
}

/**
@@ -53,7 +54,7 @@ export default class Scheduler {

private _status: STATUS;
private _zkClient: ZkRegistry;
private _serverAgent: ServerAgent;
private _serverAgent: DubboAgent;

/**
* static factory method
@@ -79,12 +80,16 @@ export default class Scheduler {
case STATUS.PADDING:
log('current scheduler was padding');
break;
case STATUS.NO_AGENT:
this._handleFailed(
requestId,
new ScheduleError('Zookeeper Can not be find any agents'),
);
break;
case STATUS.FAILED:
this._handleFailed(
requestId,
new ScheduleError(
'Schedule error, ZooKeeper Could not be connected or can not find any agents!',
),
new ScheduleError('ZooKeeper Could not be connected'),
);
break;
}
@@ -99,7 +104,7 @@ export default class Scheduler {

//如果负载为空,也就是没有任何provider提供服务
if (agentSet.size === 0) {
this._status = STATUS.FAILED;
this._status = STATUS.NO_AGENT;
//将队列中的所有dubbo调用全调用失败
const err = new ScheduleError('Can not be find any agents');
queue.allFailed(err);
@@ -108,7 +113,7 @@ export default class Scheduler {
}

//初始化serverAgent
this._serverAgent = serverAgent.from(agentSet).subscribe({
this._serverAgent = dubboAgent.from(agentSet).subscribe({
onConnect: this._handleOnConnect,
onData: this._handleOnData,
onClose: this._handleOnClose,
@@ -109,6 +109,10 @@ export default class SocketWorker implements IObservable<ISocketSubscriber> {
this._socket.write(encoder.encode());
}

get status() {
return this._status;
}

/**
* current status is whether avaliable or not
*/
@@ -161,14 +161,17 @@ export class ZkRegistry implements IObservable<IRegistrySubscriber> {
log('dubboServiceUrl:|> %O', this._dubboServiceUrlMap);
}

if (!eqSet(this._agentAddrSet, this._allAgentAddrSet)) {
this._agentAddrSet = this._allAgentAddrSet;
this._subscriber.onData(this._allAgentAddrSet);
} else {
log('no agent change');
}
this._agentAddrSet = this._allAgentAddrSet;
this._subscriber.onData(this._allAgentAddrSet);
};

/**
* get current all agent address
*/
get allAgentAddrSet() {
return this._agentAddrSet;
}

/**
* 获取所有的负载列表,通过agentAddrMap聚合出来
* 这样有点Reactive的感觉,不需要考虑当中增加删除的动作

0 comments on commit 9835471

Please sign in to comment.