Skip to content

Commit

Permalink
自适应负载均衡优化 (#309)
Browse files Browse the repository at this point in the history
* 优化客户端自适应负载均衡效果,修复客户端生产不均匀问题
  • Loading branch information
llIlll committed Sep 25, 2020
1 parent 2db5fea commit 95b3f5e
Show file tree
Hide file tree
Showing 18 changed files with 116 additions and 428 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ protected Nodes getTopicNodes(TopicMetadata topicMetadata, List<BrokerNode> brok
if (node == null) {
node = new Node();
node.setUrl(String.valueOf(brokerNode.getId()));
node.setNearby(brokerNode.isNearby());

if (topicMetadata.getProducerPolicy() != null && topicMetadata.getProducerPolicy().getNearby() != null) {
node.setNearby(topicMetadata.getProducerPolicy().getNearby() && brokerNode.isNearby());
} else {
node.setNearby(false);
}

Node oldNode = brokerNode.putIfAbsentAttachment(NODE_CACHE_KEY, node);
if (oldNode != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
*/
package org.joyqueue.client.internal.producer.support;

import com.jd.laf.extension.Extension;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.RandomUtils;
import org.joyqueue.client.internal.metadata.domain.PartitionGroupMetadata;
import org.joyqueue.client.internal.metadata.domain.PartitionNode;
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
import org.joyqueue.client.internal.producer.domain.ProduceMessage;
Expand All @@ -30,7 +31,6 @@
* author: gaohaoxiang
* date: 2018/12/27
*/
@Extension(singleton = false)
public class WeightedPartitionSelector extends AbstractPartitionSelector {

public static final String NAME = "weighted";
Expand All @@ -41,13 +41,25 @@ protected PartitionNode nextPartition(ProduceMessage message, TopicMetadata topi
double weight = 0;
int index = 0;

for (BrokerNode brokerNode : brokerNodes) {
weights[index] = brokerNode.getWeight();
if (weights[index] < 0) {
weights[index] = 0;
if (topicMetadata.getProducerPolicy() != null && MapUtils.isNotEmpty(topicMetadata.getProducerPolicy().getWeight())) {
for (BrokerNode brokerNode : brokerNodes) {
weights[index] = brokerNode.getWeight();
if (weights[index] < 0) {
weights[index] = 0;
}
weight += weights[index];
index++;
}
} else {
for (BrokerNode brokerNode : brokerNodes) {
List<PartitionGroupMetadata> brokerPartitionGroups = topicMetadata.getBrokerPartitionGroups(brokerNode.getId());
weights[index] = (brokerPartitionGroups != null ? brokerPartitionGroups.size() * 10 : brokerNode.getWeight());
if (weights[index] < 0) {
weights[index] = 0;
}
weight += weights[index];
index++;
}
weight += weights[index];
index++;
}

if (weight > 0) {
Expand Down
4 changes: 0 additions & 4 deletions joyqueue-client/joyqueue-client-loadbalance-adaptive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package org.joyqueue.client.loadbalance.adaptive;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.joyqueue.client.loadbalance.adaptive.config.AdaptiveLoadBalanceConfig;
import org.joyqueue.client.loadbalance.adaptive.node.Metrics;
import org.joyqueue.client.loadbalance.adaptive.node.Node;
import org.joyqueue.client.loadbalance.adaptive.node.Nodes;
import org.joyqueue.client.loadbalance.adaptive.node.WeightNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* AdaptiveLoadBalance
Expand All @@ -19,20 +18,19 @@
*/
public class AdaptiveLoadBalance {

protected static final Logger logger = LoggerFactory.getLogger(AdaptiveLoadBalance.class);

private AdaptiveLoadBalanceConfig config;
private List<ScoreJudge> scoreJudges;

private WeightLoadBalance weightLoadBalance = new WeightLoadBalance();
private RandomLoadBalance randomLoadBalance = new RandomLoadBalance();

private Cache<String, Node> selectCache;

public AdaptiveLoadBalance(AdaptiveLoadBalanceConfig config) {
this.config = config;
this.scoreJudges = getScoreJudges(config);
this.selectCache = CacheBuilder.newBuilder()
.expireAfterWrite(config.getComputeInterval(), TimeUnit.MILLISECONDS)
.build();
Metrics.cacheInterval = config.getCacheInterval();
Metrics.sliceInterval = config.getSliceInterval();
}

protected List<ScoreJudge> getScoreJudges(AdaptiveLoadBalanceConfig config) {
Expand All @@ -53,18 +51,11 @@ public Node select(Nodes nodes) {
}

protected boolean isStartup(Nodes nodes) {
return nodes.getMetric().getTps() > config.getSsthreshhold();
return nodes.getMetric().getCount() > config.getSsthreshhold();
}

protected Node adaptiveSelect(Nodes nodes) {
String cacheKey = nodes.toString();
try {
return selectCache.get(cacheKey, () -> {
return doAdaptiveSelect(nodes);
});
} catch (ExecutionException e) {
return doAdaptiveSelect(nodes);
}
return doAdaptiveSelect(nodes);
}

public Node doAdaptiveSelect(Nodes nodes) {
Expand All @@ -75,9 +66,6 @@ public Node doAdaptiveSelect(Nodes nodes) {
double compute = scoreJudge.compute(nodes, node);
score += (compute / 100 * scoreJudge.getRatio());
}
if (score == 0) {
score = 1;
}
weightNodes.add(new WeightNode(node, score));
}
return weightSelect(weightNodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
*/
public class AdaptiveLoadBalanceConfig {

private int ssthreshhold = 10;
private int ssthreshhold = 1000;
private String[] judges;
private int computeInterval = 1000 * 1;
private int cacheInterval = 1000 * 1;
private int sliceInterval = 1000 * 60;

public int getSsthreshhold() {
return ssthreshhold;
Expand All @@ -27,11 +28,19 @@ public void setJudges(String[] judges) {
this.judges = judges;
}

public int getComputeInterval() {
return computeInterval;
public int getCacheInterval() {
return cacheInterval;
}

public void setComputeInterval(int computeInterval) {
this.computeInterval = computeInterval;
public void setCacheInterval(int cacheInterval) {
this.cacheInterval = cacheInterval;
}

public int getSliceInterval() {
return sliceInterval;
}

public void setSliceInterval(int sliceInterval) {
this.sliceInterval = sliceInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,10 @@ public class AvailableScoreJudge implements ScoreJudge {

@Override
public double compute(Nodes nodes, Node node) {
if (node.getMetric().getErrorTps() == 0) {
return 0;
} else if (nodes.getMetric().getErrorTps() == 0) {
return -100;
} else if (node.getMetric().getErrorTps() >= nodes.getMetric().getErrorTps()) {
return -100;
} else {
return -Math.min(100 - ((double) (node.getMetric().getErrorTps() / nodes.getMetric().getErrorTps()) * 100), 100);
if (node.getMetric().getErrorCount() != 0) {
return -Integer.MAX_VALUE;
}
return 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,29 @@
import org.joyqueue.client.loadbalance.adaptive.node.Nodes;

/**
* TP99ScoreJudge
* AvgScoreJudge
* author: gaohaoxiang
* date: 2020/8/10
*/
public class TP99ScoreJudge implements ScoreJudge {
public class AvgScoreJudge implements ScoreJudge {

private static final double BASE_SCORE = 50;
public static int exceptionThreshhold = 500;

@Override
public double compute(Nodes nodes, Node node) {
double score = node.getMetric().getTp99() - nodes.getMetric().getTp99();
if (score > 0) {
return 0;
} else if (score == 0) {
return BASE_SCORE;
double maxAvg = 0;
for (Node otherNode : nodes.getNodes()) {
if (!otherNode.getUrl().equals(node.getUrl())) {
maxAvg = Math.max(maxAvg, otherNode.getMetric().getAvg());
}
}

double score = node.getMetric().getAvg() / maxAvg * 100;

if (score > exceptionThreshhold) {
return -Integer.MAX_VALUE;
} else {
return BASE_SCORE + Math.min(format(-score), BASE_SCORE);
return 100;
}
}

Expand All @@ -42,6 +48,6 @@ public double getRatio() {

@Override
public String type() {
return "tp99";
return "avg";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public double compute(Nodes nodes, Node node) {
if (node.isNearby()) {
return 100;
}
return 0;
return 1;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ public class Metric {
private Metrics all = new Metrics();
private Metrics error = new Metrics();

public void refresh() {
all.refresh();
error.reinit();
}

public Tracer begin() {
return new Tracer(this);
}
Expand Down Expand Up @@ -53,10 +58,6 @@ public double getTp75() {
return all.getTp75();
}

public double getTp90() {
return all.getTp90();
}

public double getMax() {
return all.getMax();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,45 @@
*/
public class Metrics {

private Meter meter;
private Reservoir reservoir;
private Histogram histogram;
public static int cacheInterval = 1000 * 1;
public static int sliceInterval = 1000 * 60;

private volatile Meter meter;
private volatile Reservoir reservoir;
private volatile Histogram histogram;

private volatile long lastSlice;

private volatile double lastAvg;
private volatile long lastAvgTime;

public Metrics() {
init();
}

public void slice() {
init();
public void init() {
this.meter = new Meter();
this.reservoir = new ExponentiallyDecayingReservoir();
this.histogram = new Histogram(reservoir);
}

protected void init() {
this.meter = new Meter();
public void slice() {
this.reservoir = new ExponentiallyDecayingReservoir();
this.histogram = new Histogram(reservoir);
}

public void mark() {
this.mark(1L);
public void refresh() {
if (System.currentTimeMillis() - lastSlice > sliceInterval) {
lastSlice = System.currentTimeMillis();
slice();
}
}

public void mark(long count) {
this.meter.mark(count);
public void reinit() {
if (System.currentTimeMillis() - lastSlice > sliceInterval) {
lastSlice = System.currentTimeMillis();
init();
}
}

public void mark(long count, double time) {
Expand Down Expand Up @@ -72,10 +87,6 @@ public double getTp75() {
return this.getSnapshot().get75thPercentile();
}

public double getTp90() {
return this.getSnapshot().getMean();
}

public double getMax() {
return this.getSnapshot().getMax();
}
Expand All @@ -85,7 +96,11 @@ public double getMin() {
}

public double getAvg() {
return this.getSnapshot().getMean();
if (System.currentTimeMillis() - lastAvgTime > cacheInterval) {
lastAvg = this.getSnapshot().getMean();
lastAvgTime = System.currentTimeMillis();
}
return lastAvg;
}

protected Snapshot getSnapshot() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public void setUrl(String url) {
}

public Metric getMetric() {
metric.refresh();
return metric;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public void setMetric(Metric metric) {
}

public Metric getMetric() {
metric.refresh();
return metric;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
org.joyqueue.client.loadbalance.adaptive.judge.RegionScoreJudge
org.joyqueue.client.loadbalance.adaptive.judge.TP99ScoreJudge
#org.joyqueue.client.loadbalance.adaptive.judge.RegionScoreJudge
org.joyqueue.client.loadbalance.adaptive.judge.AvgScoreJudge
org.joyqueue.client.loadbalance.adaptive.judge.AvailableScoreJudge
Loading

0 comments on commit 95b3f5e

Please sign in to comment.