Skip to content
Permalink
Browse files
1. remove socket-pool 2. add retry times 3. refactor server-agent 4. …
…improve sysTrace
  • Loading branch information
hufeng committed Jul 9, 2018
1 parent 49fd692 commit 2a59d75791ccb61f8218cd085779951be712f151
Showing 5 changed files with 108 additions and 163 deletions.
@@ -83,7 +83,7 @@ export default class Scheduler {
this._handleFailed(
requestId,
new ScheduleError(
'Schedule error, ZooKeeper Could not be connected!',
'Schedule error, ZooKeeper Could not be connected or can not find any agents!',
),
);
break;
@@ -143,16 +143,20 @@ export default class Scheduler {
//get request context
const ctx = queue.requestQueue.get(requestId);
//get socket agent list
const agentAddrSet = this._zkClient.getAgentAddrList(ctx);
log('agentAddrSet-> %O', agentAddrSet);
const agentAddrList = this._zkClient.getAgentAddrList(ctx);
log('agentAddrSet-> %O', agentAddrList);
const worker = this._serverAgent.getAvailableSocketWorker(agentAddrList);

const worker = this._serverAgent.getAvailableSocketWorker(agentAddrSet);
//if could not find any available socket agent worker
if (!worker) {
const {requestId, dubboInterface, version, group} = ctx;
const msg = `requestId#${requestId}:Could not find any agent address with ${dubboInterface}#${version}#${group}`;
log(msg);
this._handleFailed(requestId, new ScheduleError(msg));
const msg = `requestId#${requestId}:Could not find any agent worker with ${dubboInterface}#${version}#${group} agentList: ${agentAddrList.join(
',',
)}`;
const err = new ScheduleError(msg);
this._handleFailed(requestId, err);
log(err);
traceErr(err);
return;
}

@@ -16,7 +16,6 @@
*/

import debug from 'debug';
import SocketPool from './socket-pool';
import SocketWorker from './socket-worker';
import {IObservable, ISocketSubscriber} from './types';
import {isDevEnv, noop, traceErr, traceInfo} from './util';
@@ -25,7 +24,7 @@ import {TAgentAddr} from './zookeeper';
const log = debug('dubbo:server-agent');

/**
* 机器agent和socket-pool的管理容器
* 机器agent和socket-worker的管理容器
* Agent可以理解为一台dubbo service的负载
*/
export class ServerAgent implements IObservable<ISocketSubscriber> {
@@ -39,32 +38,41 @@ export class ServerAgent implements IObservable<ISocketSubscriber> {
}

private _subscriber: ISocketSubscriber;
private readonly _serverAgentMap: Map<TAgentAddr, SocketPool>;
private readonly _serverAgentMap: Map<TAgentAddr, SocketWorker>;

/**
* static factor method
* @param agentAddrList 负载地址列表
*/
from = (agentAddrs: Set<string>) => {
log('agerntAddrs: %O', agentAddrs);
log('create server-agent :|> %O', agentAddrs);
//获取负载host:port列表
//根据负载创建连接池
process.nextTick(() => {
for (let agentAddr of agentAddrs) {
//如果负载中存在该负载,继续下一个
if (this._serverAgentMap.has(agentAddr)) {
//when current worker was retry, add retry chance
const worker = this._serverAgentMap.get(agentAddr);
if (worker.isRetry) {
log(`${agentAddr} was retry`);
//add retry chance
worker.resetRetry();
}
continue;
}
log(`create ServerAgent: ${agentAddr}`);
traceInfo(`ServerAgent create socket-pool: ${agentAddr}`);
const socketPool = SocketPool.from(agentAddr).subscribe({
traceInfo(`ServerAgent create SocketWorker: ${agentAddr}`);
const socketWorker = SocketWorker.from(agentAddr).subscribe({
onConnect: this._subscriber.onConnect,
onData: this._subscriber.onData,
onClose: ({pid, host, port}) => {
this._clearClosedPool(host + ':' + port);
onClose: ({host, pid, port}) => {
//delete close worker
this._clearCloseWorker(host + ':' + port);
//notify scheduler
this._subscriber.onClose({pid});
},
});
this._serverAgentMap.set(agentAddr, socketPool);
this._serverAgentMap.set(agentAddr, socketWorker);
}
});

@@ -75,33 +83,38 @@ export class ServerAgent implements IObservable<ISocketSubscriber> {
* 获取可用负载对应的socketWorker
* @param agentAddrList
*/
getAvailableSocketWorker(agentAddrList: Array<TAgentAddr>): SocketWorker {
getAvailableSocketWorker(
agentAddrList: Array<TAgentAddr> = [],
): SocketWorker {
const availableAgentList = this._getAvailableSocketAgents(agentAddrList);
const len = availableAgentList.length;

if (len === 0) {
traceErr(
new Error(
`agentAddrList->${agentAddrList.join()} could not find any avaliable socekt worker`,
),
);
return null;
} else if (len === 1) {
return availableAgentList[0].worker;
return availableAgentList[0];
} else {
return availableAgentList[Math.floor(Math.random() * len)].worker;
//match random
return availableAgentList[Math.floor(Math.random() * len)];
}
}

private _clearClosedPool = (agentAddr: string) => {
const socketPool = this._serverAgentMap.get(agentAddr);
if (socketPool.isAllClose) {
//如果全部关闭
log(
`${agentAddr}'s pool socket-worker had all closed. delete ${agentAddr}`,
);
this._serverAgentMap.delete(agentAddr);
traceErr(
new Error(
`${agentAddr}'s pool socket-worker had all closed. delete ${agentAddr}`,
),
);
}
/**
* remove close socket-worker from server agent
*/
private _clearCloseWorker = (agentAddr: string) => {
//如果全部关闭
log(`socket-worker#${agentAddr} was closed. delete this socket worker`);
this._serverAgentMap.delete(agentAddr);
traceErr(
new Error(
`socket-worker#${agentAddr} was closed. delete this socket worker`,
),
);
if (isDevEnv) {
log('SocketAgent current agentHost->', this._serverAgentMap.keys());
}
@@ -118,16 +131,17 @@ export class ServerAgent implements IObservable<ISocketSubscriber> {
*/
private _getAvailableSocketAgents(
agentAddrList: Array<TAgentAddr>,
): Array<SocketPool> {
): Array<SocketWorker> {
let availableList = [];
for (let agentAddr of agentAddrList) {
const socketPool = this._serverAgentMap.get(agentAddr);
if (socketPool && socketPool.hasAvaliableNodes) {
availableList.push(socketPool);
const socketWorker = this._serverAgentMap.get(agentAddr);
if (socketWorker && socketWorker.isAvaliable) {
availableList.push(socketWorker);
}
}
return availableList;
}
}

//expose sington
export default new ServerAgent();

This file was deleted.

0 comments on commit 2a59d75

Please sign in to comment.