Skip to content

Commit

Permalink
refactor: choose addresses in loadbalancer (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
gxcsoccer authored Jun 4, 2019
1 parent 1c0092e commit fb29b09
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 192 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ language: node_js
node_js:
- '8'
- '10'
- '12'
before_install:
- 'wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz'
- 'tar xf zookeeper-3.4.6.tar.gz'
Expand Down
166 changes: 41 additions & 125 deletions lib/client/address_group.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

const assert = require('assert');
const Base = require('sdk-base');
const utils = require('./utils');
const Table = require('easy-table');
const sleep = require('mz-modules/sleep');
const { printAddresses } = require('./utils');
const DynamicConfig = require('./dynamic_config');
const HealthCounter = require('./metric/health_counter');
const createLoadBalancer = require('./loadbalancer');
Expand All @@ -27,10 +27,7 @@ class AddressGroup extends Base {
this._inited = false;
this._closed = false;
this._addressList = null; // 生效地址列表
this._choosedSize = 0; // 生效的地址数量
this._allAddressList = null; // 原始地址列表
this._totalSize = 0; // 完整的地址数量
this._unChoosedAddressList = null; // 未被选中的地址
this._faultAddressMap = new Map(); // 故障地址列表
this._weightMap = new Map(); // <host, weight>
this._maxIsolatedNum = 0; // 最大的故障摘除数量,如果故障机器超过这个值则禁用摘除功能
Expand Down Expand Up @@ -89,7 +86,7 @@ class AddressGroup extends Base {
const prev = this._connectionPoolSize;
this._connectionPoolSize = val;
this.logger.info('[AddressGroup] adjust connection pool size from %s to %s, group=%s', prev, val, this.key);
this._refresh();
this.refresh();
}
}

Expand All @@ -106,16 +103,31 @@ class AddressGroup extends Base {
return this.options.loadbalancerClass;
}

get totalSize() {
return this._allAddressList ? this._allAddressList.length : 0;
}

get choosedSize() {
return this._addressList ? this._addressList.length : 0;
}

get allAddressList() {
return this._allAddressList;
}

set allAddressList(val) {
this._allAddressList = val;
}

get addressList() {
return this._addressList;
}

set addressList(val) {
this._addressList = this._chooseAddresses(val);
this._choosedSize = this._addressList.length;
this._loadbalancer.reset();
this._addressList = this._loadbalancer.reset(val);
const choosedSize = this.choosedSize;
// 最大熔断个数,为了防止「雪崩」,超过这个数,则关闭熔断
this._maxIsolatedNum = Math.ceil(this.faultToleranceConfig.maxIsolatedPercentage * this._choosedSize);
this._maxIsolatedNum = Math.ceil(this.faultToleranceConfig.maxIsolatedPercentage * choosedSize);

// 故障地址和权重都重置一下
const newFaultAddressMap = new Map();
Expand All @@ -130,12 +142,6 @@ class AddressGroup extends Base {
this._faultAddressMap = newFaultAddressMap;
this._degradeEnable = true;

this.logger.debug('[AddressGroup] choosing %s / %s addresses for group=%s%s', this._choosedSize, this._totalSize, this.key, printAddresses(this._addressList));
if (this._unChoosedAddressList.length) {
this.logger.debug('[AddressGroup] there are %s addresses not choosed for connection control strategy%s',
this._unChoosedAddressList.length, printAddresses(this._unChoosedAddressList));
}

if (!this.options.waitConnect && !this._inited) {
this._inited = true;
this.ready(true);
Expand All @@ -155,88 +161,23 @@ class AddressGroup extends Base {
});
}

_refresh() {
if (this._allAddressList) {
this.addressList = this._allAddressList;
checkHealthy(address) {
const unHealthy = this._faultAddressMap.has(address.host) || this._weightMap.get(address.host) < DEFAULT_WEIGHT;
const isHealthy = this.connectionManager.connections.has(address.host) && !unHealthy;
if (isHealthy) {
// healthy
return 1;
} else if (unHealthy) {
return -1;
}
// 0 代表当前健康状态还未知(还没有连接过的节点)
return 0;
}

_needElasticControl(addressCount) {
const connectionPoolConfig = this.connectionPoolConfig;
if (!connectionPoolConfig) return false;

// 如果地址不够,禁用弹性控制
if (addressCount < connectionPoolConfig.minAddressCount) {
return false;
refresh() {
if (this.totalSize > this.choosedSize) {
this.addressList = this.allAddressList;
}

const enableThreshold = connectionPoolConfig.enableThreshold || 50;
// 开启弹性控制有两个条件
// 1. 配置 elasticControl = true
// 2. 当前分组的地址数量要大于开启的阈值(enableThreshold)
return connectionPoolConfig.elasticControl && addressCount > enableThreshold;
}

_chooseAddresses(addressList) {
const newSet = new Set();
const oldSet = new Set();

// 只有地址真正改变的时候才 shuffle 一次
if (this._allAddressList !== addressList) {
// 找出推送过来新增加的地址(扩容的)
if (this._allAddressList) {
for (const addr of this._allAddressList) {
oldSet.add(addr.host);
}
}
for (const addr of addressList) {
if (!oldSet.has(addr.host)) {
newSet.add(addr.host);
}
}
this._allAddressList = utils.shuffle(addressList);
}
this._totalSize = this._allAddressList.length;
this._unChoosedAddressList = [];
// 禁用弹性控制直接返回 或者 地址太少,直接返回
if (!this._needElasticControl(this._totalSize)) {
return this._allAddressList;
}
if (this._connectionPoolSize > this._totalSize) {
this.logger.warn('[AddressGroup] group: %s needs %s addresses, but there are only %s', this.key, this._connectionPoolSize, this._totalSize);
return this._allAddressList;
}
let leftCount = this._connectionPoolSize;
if (leftCount > this.connectionPoolConfig.maxAddressCount) {
leftCount = this.connectionPoolConfig.maxAddressCount;
this.logger.info('[AddressGroup] there are %s addresses totally, exteeding the max address count: %s, group: %s',
this._totalSize, this.connectionPoolConfig.maxAddressCount, this.key);
}
const choosedAddressList = [];
const connections = this.connectionManager.connections;
for (const address of this._allAddressList) {
const unHealthy = this._faultAddressMap.has(address.host) || this._weightMap.get(address.host) < DEFAULT_WEIGHT;
const isHealthy = connections.has(address.host) && !unHealthy;
// 需求:新推送的地址或已经连接上的地址,优先被选中
//
// 因为当客户端集群很大,服务端很小的场景,客户端这边的分摊到一台机器的 qps 可能不高,
// 但是服务端确可能很高,这个时候服务端扩容,如果按照老的逻辑,客户端这边认为不需要扩容,则不会去尝试连接扩容的机器(连上的优先)
if ((isHealthy || newSet.has(address.host)) && leftCount > 0) {
choosedAddressList.push(address);
leftCount--;
} else if (unHealthy) {
// 已知为异常的地址放到栈顶,只有地址不够的时候才被选中
this._unChoosedAddressList.unshift(address);
} else {
// 未知状态的地址推到栈底,优先被选中
this._unChoosedAddressList.push(address);
}
}
// 补全个数
while (leftCount--) {
choosedAddressList.push(this._unChoosedAddressList.pop());
}
return choosedAddressList;
}

// 统计整个分组地址列表的健康状态
Expand All @@ -248,18 +189,7 @@ class AddressGroup extends Base {
const avgRTStr = avgRT + 'ms';
const totalRequestCount = hc.totalCount;

if (this.connectionPoolConfig.elasticControl) {
// 根据上个窗口的总请求数来计算需要多少个 connection
const capacityPerConnection = this.connectionPoolConfig.capacityPerConnection;
let need = Math.ceil(totalRequestCount / capacityPerConnection);
if (need < this.connectionPoolConfig.minAddressCount) {
need = this.connectionPoolConfig.minAddressCount;
}
if (need > this.connectionPoolConfig.maxAddressCount) {
need = this.connectionPoolConfig.maxAddressCount;
}
this.connectionPoolSize = need;
}
this._loadbalancer.adjustConnectionPoolSize(totalRequestCount);

// 全局开关:如果应用打开了这个开关,则会开启整个单点故障自动剔除摘除功能,否则完全不进入该功能的逻辑
if (!this.faultToleranceConfig.regulationEffective) {
Expand Down Expand Up @@ -287,7 +217,7 @@ class AddressGroup extends Base {
// 错误率远高于平均错误率,则对该地址进行调控,降低其权重
const latestHealthCount = conn.latestHealthCount;
// 确保 connection 上个周期有统计数据,并且有请求
if (latestHealthCount && latestHealthCount.totalCount) {
if (this.degradeEnable && latestHealthCount && latestHealthCount.totalCount) {
errorRateStr = latestHealthCount.errorRate + '%';
rtStr = latestHealthCount.avgRT + 'ms';
if (avgErrorRate === 0) {
Expand Down Expand Up @@ -344,17 +274,14 @@ class AddressGroup extends Base {
// 避免太多日志
if (changed) {
this.logger.debug('[AddressGroup] group: %s weight %s, total request count: %d, avg rt: %s, avg error rate: %s, address count: %d\n%s',
this.key, changed ? 'changed' : 'unchanged', totalRequestCount, avgRTStr, avgErrorRateStr, this._choosedSize, table.toString());
this.key, changed ? 'changed' : 'unchanged', totalRequestCount, avgRTStr, avgErrorRateStr, this.choosedSize, table.toString());
}
// 如果一次降级的太多,可能造成流量全部打到部分机器,从而雪崩,所以超过某个阀值后禁用调控
if (degradeCount <= this._maxIsolatedNum) {
this._degradeEnable = true;
} else {
this._degradeEnable = false;
// 异常的机器过多,并且有足够多后补,则尝试刷新一批地址
if (degradeCount - this._maxIsolatedNum < this._unChoosedAddressList.length) {
this._refresh();
}
this.refresh();
}
}

Expand Down Expand Up @@ -385,8 +312,8 @@ class AddressGroup extends Base {
await this._connectAll(addressList);
}
// 如果重连以后还是有失败的地址,并且存在未被选中的地址,则尝试替换一波
if (this._faultAddressMap.size && this._unChoosedAddressList.length) {
this._refresh();
if (this._faultAddressMap.size) {
this.refresh();
}
await sleep(this.options.retryFaultInterval);
}
Expand All @@ -404,7 +331,7 @@ class AddressGroup extends Base {

let weight = this._weightMap.get(address.host) || DEFAULT_WEIGHT;
// 长时间没有被路由到的话,需要给一次机会做 single check
if (weight < DEFAULT_WEIGHT && Date.now() - conn.lastInvokeTime > this._maxIdleWindow) {
if (weight < DEFAULT_WEIGHT && Date.now() - conn.lastInvokeTime >= this._maxIdleWindow) {
weight = DEFAULT_WEIGHT;
}
return weight;
Expand All @@ -424,24 +351,13 @@ class AddressGroup extends Base {
close() {
this._weightMap.clear();
this._faultAddressMap.clear();
this._unChoosedAddressList = [];
this._allAddressList = [];
this._addressList = [];
this._choosedSize = 0;
this._closed = true;
if (this._healthCounter) {
this._healthCounter.close();
}
}
}

function printAddresses(addressList) {
let list = addressList.map(addr => ' - ' + addr.href);
if (list.length > 20) {
list = list.slice(0, 20);
list.push('... only 20 first addresses will be shown here!');
}
return '\n' + list.join('\n');
}

module.exports = AddressGroup;
2 changes: 1 addition & 1 deletion lib/client/dynamic_config.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ module.exports = {
maxIsolatedPercentage: 0.4,
leastWindowCount: 10,

leastWindowRtMultiple: 3,
leastWindowRtMultiple: 10,
recoverWindowRtMultiple: 1.5,

// 时间窗口内异常率与服务平均异常率的降级比值:在对统计信息进行计算的时候,
Expand Down
Loading

0 comments on commit fb29b09

Please sign in to comment.