Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

修复zk的状态引起的两个问题 #140

Merged
merged 2 commits into from Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 33 additions & 1 deletion packages/dubbo/src/registry/zookeeper.ts
Expand Up @@ -17,7 +17,7 @@

import debug from 'debug';
import ip from 'ip';
import zookeeper from 'node-zookeeper-client';
import zookeeper, {State} from 'node-zookeeper-client';
import qs from 'querystring';
import DubboUrl from '../dubbo-url';
import {
Expand All @@ -36,6 +36,7 @@ import Registry from './registry';

const log = debug('dubbo:zookeeper');
const ipAddress = ip.address();
const CHECK_TIME = 30 * 1000;

export class ZkRegistry extends Registry<IZkClientProps & IDubboRegistryProps> {
constructor(props: IZkClientProps & IDubboRegistryProps) {
Expand All @@ -49,6 +50,7 @@ export class ZkRegistry extends Registry<IZkClientProps & IDubboRegistryProps> {
this._connect(this._init);
}

private _checkTimer: NodeJS.Timer;
private _client: zookeeper.Client;
private _agentAddrSet: Set<string>;

Expand Down Expand Up @@ -105,6 +107,35 @@ export class ZkRegistry extends Registry<IZkClientProps & IDubboRegistryProps> {
this._subscriber.onData(this._allAgentAddrSet);
};

/**
* 重连
*/
private _reconnect() {
if (this._client) {
this._client.close();
}
this._connect(this._init);
}

/**
* 由于zk自己的监测机制不明确, 改为自主检测
*/
private _monitor() {
clearInterval(this._checkTimer);
this._checkTimer = setInterval(() => {
const state = this._client.getState();
switch (state) {
case State.EXPIRED:
case State.DISCONNECTED:
log(`checker is error, state is ${state}, need reconnect`);
this._reconnect();
break;
default:
log(`checker is ok, state is ${state}`);
}
}, CHECK_TIME);
}

/**
* 获取所有的负载列表,通过agentAddrMap聚合出来
* 这样有点Reactive的感觉,不需要考虑当中增加删除的动作
Expand Down Expand Up @@ -171,6 +202,7 @@ export class ZkRegistry extends Registry<IZkClientProps & IDubboRegistryProps> {
clearTimeout(timeId);
callback(null);
msg.emit('sys:ready');
this._monitor();
});

//the connection between client and server is dropped.
Expand Down
8 changes: 7 additions & 1 deletion packages/dubbo/src/scheduler.ts
Expand Up @@ -131,7 +131,13 @@ export default class Scheduler {
log(err);
//说明zookeeper连接不上
if (err instanceof ZookeeperTimeoutError) {
this._status = STATUS.FAILED;
switch (this._status) {
// 当zk已经初始化完成后, 相应的provider已经找到了, 如果zk这时出现问题, 不应该让provider不允许调用
case STATUS.READY:
break;
default:
this._status = STATUS.FAILED;
}
}
};

Expand Down